英文:
For Loop alternative to dynamically filter and summarise millions of points
问题
我有一个包含500万条记录的数据集,格式如下:
library(dplyr)
Data <- tibble(X = runif(5000000, min = 200000, max = 400000),
Y = runif(5000000, min = 400000, max = 500000),
UID = 1:5000000,
Count = runif(5000000, min = 1, max = 2000))
对于每个UID,我需要总结以下信息:1)其他点的数量,2)其他点的计数总和,3)在200米、400米、800米和1600米范围内的UID的密度计算。所需的最终输出是一个表格,包含每个UID及其摘要输出。
以下的for循环演示了所需的输出,但需要花费几周的时间来完成:
Summary <- NULL
for (n in 1:nrow(Data)) {
Data_Row <- Data[n,]
Row_200 <- Data %>%
filter(between(X, Data_Row$X - 200, Data_Row$X + 200) &
between(Y, Data_Row$Y - 200, Data_Row$Y + 200)) %>%
summarise(Instances_200 = n(), Count_200 = sum(Count), Calculation_200 = Count_200 / (Instances_200 * 0.000625))
Row_400 <- Data %>%
filter(between(X, Data_Row$X - 400, Data_Row$X + 400) &
between(Y, Data_Row$Y - 400, Data_Row$Y + 400)) %>%
summarise(Instances_400 = n(), Count_400 = sum(Count), Calculation_400 = Count_400 / (Instances_400 * 0.000625))
Row_800 <- Data %>%
filter(between(X, Data_Row$X - 800, Data_Row$X + 800) &
between(Y, Data_Row$Y - 800, Data_Row$Y + 800)) %>%
summarise(Instances_800 = n(), Count_800 = sum(Count), Calculation_800 = Count_800 / (Instances_800 * 0.000625))
Row_1600 <- Data %>%
filter(between(X, Data_Row$X - 1600, Data_Row$X + 1600) &
between(Y, Data_Row$Y - 1600, Data_Row$Y + 1600)) %>%
summarise(Instances_1600 = n(), Count_1600 = sum(Count), Calculation_1600 = Count_1600 / (Instances_1600 * 0.000625))
Summary_Row <- bind_cols(Data[n,"UID"], Row_200, Row_400, Row_800, Row_1600)
Summary <- bind_rows(Summary, Summary_Row)
}
我知道像lapply
和map
这样的函数,但我不知道如何使用它们来更快地进行这种动态过滤和汇总。
英文:
I have a dataset with 5 million records in this format:
library(dplyr)
Data <- tibble(X=runif(5000000, min=200000, max=400000),
Y=runif(5000000, min=400000, max=500000),
UID = 1:5000000,
Count = runif(5000000, min=1, max=2000))
For each UID, I need to summarise 1) the number of other points 2) the count sum of the other points and 3) a density calculation of UIDs that are within 200m, 400m, 800m and 1600m. The final output required is a table with each UID and its summary output.
This for loop illustrates the output required, but would take weeks to complete:
Summary <- NULL
for (n in 1:nrow(Data)){
Data_Row <- Data[n,]
Row_200 <- Data %>% filter(between(X, Data_Row$X-200,Data_Row$X+200) & between(Y, Data_Row$Y-200, Data_Row$Y+200)) %>%
summarise(Instances_200 = n(), Count_200 = sum(Count), Calcualtion_200 = Count_200/(Instances_200*0.000625))
Row_400 <- Data %>% filter(between(X, Data_Row$X-400,Data_Row$X+400) & between(Y, Data_Row$Y-400, Data_Row$Y+400)) %>%
summarise(Instances_400 = n(), Count_400 = sum(Count), Calcualtion_400 = Count_400/(Instances_400*0.000625))
Row_800 <- Data %>% filter(between(X, Data_Row$X-800,Data_Row$X+800) & between(Y, Data_Row$Y-800, Data_Row$Y+800)) %>%
summarise(Instances_800 = n(), Count_800 = sum(Count), Calcualtion_800 = Count_800/(Instances_800*0.000625))
Row_1600 <- Data %>% filter(between(X, Data_Row$X-1600,Data_Row$X+1600) & between(Y, Data_Row$Y-1600, Data_Row$Y+1600)) %>%
summarise(Instances_1600 = n(), Count_1600 = sum(Count), Calcualtion_1600 = Count_1600/(Instances_1600*0.000625))
Summary_Row <- bind_cols(Data[n,"UID"], Row_200, Row_400, Row_800, Row_1600)
Summary <- bind_rows(Summary, Summary_Row)
}
I am aware of things like lapply
and map
, but I don't know how they could be used to do this dynamic filtering and summarisation more quickly.
答案1
得分: 3
基于@Jon Spring的评论,我参考了[此答案][1]来加快处理速度。以下答案现在在我的真实数据上运行大约需要12小时(而不是我的问题中的代码需要的天数/周数)。它在某些时候需要较多内存(我使用带有32GB RAM的Windows桌面),这就是为什么我尽量减少存储内容的原因(即为什么我将每个循环的输出写入CSV文件,然后在最后将它们带回来)。使用data.table
确实加快了速度,但我确信仍然有一种更快/更有效的方法来做到这一点 - 但这就是我所需的(即使我的代码有点混乱!)。
# geosphere v1.5-19 required
install.packages('geosphere', repos='https://rspatial.r-universe.dev')
# Required libraries
library(data.table)
library(dplyr)
library(geosphere)
library(tidyr)
library(readr)
library(purrr)
set.seed(123)
# 输入数据
Data <- data.table(
X=runif(5000000, min=200000, max=400000),
Y=runif(5000000, min=400000, max=500000),
UID = 1:5000000,
Count = sample(1:2000,5000000,replace=TRUE))
# 为每个点添加50m网格单元ID
Data[, ID := OSGB(Data[,c("X", "Y")], "50m")]
# 汇总每个50m网格单元的记录数
Data_Summary <- Data[, .(Count = .N), by=.(ID)]
# 为覆盖'Data'范围的50米网格方格生成质心坐标
Coordinates <- CJ(X = seq((floor(min(Data$X)/1000)*1000)+25, ceiling(max(Data$X)/1000)*1000, 50),
Y = seq((floor(min(Data$Y)/1000)*1000)+25, ceiling(max(Data$Y)/1000)*1000, 50))
# 为每个质心添加50m网格单元ID
Coordinates[, ID := OSGB(Coordinates[,c("X", "Y")], "50m")]
# 将50m网格单元质心坐标与'Data_Summary'连接
Data_Summary_with_Centroids <- Coordinates[Data_Summary, on = .(ID), nomatch = NULL]
# 定义最大缓冲区大小
Buffer <- 1650
# 为每个网格质心生成偏移XY坐标
Data_Summary_with_Centroids[,`:=`(X_NW = X-Buffer, Y_NW = Y+Buffer,
X_NE = X+Buffer, Y_NE = Y+Buffer,
X_SW = X-Buffer, Y_SW = Y-Buffer,
X_SE = X+Buffer, Y_SE = Y-Buffer)]
# 为所有坐标对分配5km网格单元ID
Data_Summary_with_Centroids[,`:=`(HashCN = OSGB(Data_Summary_with_Centroids[,c("X", "Y")], "5km"),
HashNW = OSGB(Data_Summary_with_Centroids[,c("X_NW", "Y_NW")], "5km"),
HashNE = OSGB(Data_Summary_with_Centroids[,c("X_NE", "Y_NE")], "5km"),
HashSW = OSGB(Data_Summary_with_Centroids[,c("X_SW", "Y_SW")], "5km"),
HashSE = OSGB(Data_Summary_with_Centroids[,c("X_SE", "Y_SE")], "5km"))]
# 移除偏移XY坐标
Data_Summary_with_Centroids[, c("X_NW", "Y_NW","X_NE", "Y_NE","X_SW", "Y_SW","X_SE", "Y_SE"):=NULL]
Unique_5km_Hash <- unique(Data_Summary_with_Centroids$HashCN)
# 所有唯一的5km网格单元ID列表
Unique_5km_Hash <- unique(Data_Summary_with_Centroids$HashCN)
XY_Columns <- c("X", "Y")
for (n in 1:length(Unique_5km_Hash)){
# 只保留具有该循环的5km网格单元ID的网格单元,其中HashCN、HashNW、HashNE、HashSW或HashSE中的任何一个
Hash <- Data_Summary_with_Centroids[Data_Summary_with_Centroids[, Reduce(`|`, lapply(.SD, `==`, Unique_5km_Hash[n])),
.SDcols = c("HashCN", "HashNW","HashNE","HashSW","HashSE")]][, c("HashNW", "HashNE","HashSW", "HashSE"):=NULL]
# 仅选择'crossing'所需的列
Hash_Cross <- Hash[, ..XY_Columns]
# 扩展以包括所有可能的值组合
suppressMessages(Hash_5km <- Hash_Cross %>%
crossing(., ., .name_repair = "unique"))
rm(Hash_Cross)
# 清理内存
invisible(gc())
# 计算距离,舍弃超过1600m的距离,仅保留在'HashCN'中的'origin'记录,并添加Count
Hash_5km <- Hash_5km %>%
mutate(Distance = sqrt((X...1 - X...3)^2 + (Y...2 - Y...4)^2)) %>%
filter(Distance <=1600) %>%
left_join(.,Hash %>% select(X, Y, ID, HashCN), join_by(X...1 == X, Y...2 == Y)) %>%
filter(HashCN == Unique_5km_Hash[n]) %>%
left_join(.,Hash %>% select(X, Y, Count), join_by(X...3 == X, Y...4 == Y)) %>%
select(ID, Count, Distance)
# 清理内存
invisible(gc())
# 创建200m内其他网格单元的汇总统计信息
Hash_200 <- Hash_5km %>%
filter(Distance <=200) %>%
group_by(ID) %>%
summarise(Instances_200 = n(), Count_200 = sum(Count), Calculation
<details>
<summary>英文:</summary>
Based on @Jon Spring's comment I have used [this answer][1] as inspiration to speed up the process. The following answer now takes around 12 hours to run on my real data (as opposed to the days/weeks the code in my question would have taken). It is a bit memory hungry at points (I am using a Windows desktop with 32GB of RAM) which is why I have minimised what gets stored (i.e. why I write out each loops output to CSV and then bring them back in at the end). Using `data.table` has definitely speed things up, but I am sure there will still be a quicker/more efficient way to do this - but this does what I need (even if my code is messy!).
# geosphere v1.5-19 required
install.packages('geosphere', repos='https://rspatial.r-universe.dev')
# Required libraries
library(data.table)
library(dplyr)
library(geosphere)
library(tidyr)
library(readr)
library(purrr)
set.seed(123)
# Input data
Data <- data.table(
X=runif(5000000, min=200000, max=400000),
Y=runif(5000000, min=400000, max=500000),
UID = 1:5000000,
Count = sample(1:2000,5000000,replace=TRUE))
# Add 50m grid cell ID for each point
Data[, ID := OSGB(Data[,c("X", "Y")], "50m")]
# Summarise number of records for each 50m grid cell
Data_Summary <- Data[, .(Count = .N), by=.(ID)]
# Generate centroid coorindates for 50 metre grid squares that cover 'Data' extent
Coordinates <- CJ(X = seq((floor(min(Data$X)/1000)*1000)+25, ceiling(max(Data$X)/1000)*1000, 50),
Y = seq((floor(min(Data$Y)/1000)*1000)+25, ceiling(max(Data$Y)/1000)*1000, 50))
# Add 50m grid cell ID for each centroid
Coordinates[, ID := OSGB(Coordinates[,c("X", "Y")], "50m")]
# Join 50m grid cell centroid coordinates to 'Data_Summary'
Data_Summary_with_Centroids <- Coordinates[Data_Summary, on = .(ID), nomatch = NULL]
# Define maximum buffer size
Buffer <- 1650
# Generate offset XY coordinates for each grid centroid
Data_Summary_with_Centroids[,`:=`(X_NW = X-Buffer, Y_NW = Y+Buffer,
X_NE = X+Buffer, Y_NE = Y+Buffer,
X_SW = X-Buffer, Y_SW = Y-Buffer,
X_SE = X+Buffer, Y_SE = Y-Buffer)]
# Assign 5km grid cell ID to all pairs of coordinates
Data_Summary_with_Centroids[,`:=`(HashCN = OSGB(Data_Summary_with_Centroids[,c("X", "Y")], "5km"),
HashNW = OSGB(Data_Summary_with_Centroids[,c("X_NW", "Y_NW")], "5km"),
HashNE = OSGB(Data_Summary_with_Centroids[,c("X_NE", "Y_NE")], "5km"),
HashSW = OSGB(Data_Summary_with_Centroids[,c("X_SW", "Y_SW")], "5km"),
HashSE = OSGB(Data_Summary_with_Centroids[,c("X_SE", "Y_SE")], "5km"))]
# Remove offset XY coordinates
Data_Summary_with_Centroids[, c("X_NW", "Y_NW","X_NE", "Y_NE","X_SW", "Y_SW","X_SE", "Y_SE"):=NULL]
Unique_5km_Hash <- unique(Data_Summary_with_Centroids$HashCN)
# List of all unique 5km grid cell IDs
Unique_5km_Hash <- unique(Data_Summary_with_Centroids$HashCN)
XY_Columns <- c("X", "Y")
for (n in 1:length(Unique_5km_Hash)){
# Subset so only grid cells that have that loops 5km grid cell ID in any of HashCN, HashNW, HashNE, HashSW or HashSE remain
Hash <- Data_Summary_with_Centroids[Data_Summary_with_Centroids[, Reduce(`|`, lapply(.SD, `==`, Unique_5km_Hash[n])),
.SDcols = c("HashCN", "HashNW","HashNE","HashSW","HashSE")]][, c("HashNW", "HashNE","HashSW", "HashSE"):=NULL]
# Select only the columns needed for 'crossing'
Hash_Cross <- Hash[, ..XY_Columns]
# Expand to include all possible combinations of values
suppressMessages(Hash_5km <- Hash_Cross %>%
crossing(., ., .name_repair = "unique"))
rm(Hash_Cross)
# Clear memory
invisible(gc())
# Calculate distances, disgard those over 1600m, keep only 'origin' records that are in 'HashCN' and add Count
Hash_5km <- Hash_5km %>%
mutate(Distance = sqrt((X...1 - X...3)^2 + (Y...2 - Y...4)^2)) %>%
filter(Distance <=1600) %>%
left_join(.,Hash %>% select(X, Y, ID, HashCN), join_by(X...1 == X, Y...2 == Y)) %>%
filter(HashCN == Unique_5km_Hash[n]) %>%
left_join(.,Hash %>% select(X, Y, Count), join_by(X...3 == X, Y...4 == Y)) %>%
select(ID, Count, Distance)
# Clear memory
invisible(gc())
# Create summary statistics of other grid cells within 200m
Hash_200 <- Hash_5km %>%
filter(Distance <=200) %>%
group_by(ID) %>%
summarise(Instances_200 = n(), Count_200 = sum(Count), Calculation_200 = Count_200/(Instances_200*0.25))
# Create summary statistics of other grid cells within 400m
Hash_400 <- Hash_5km %>%
filter(Distance <=400) %>%
group_by(ID) %>%
summarise(Instances_400 = n(), Count_400 = sum(Count), Calculation_400 = Count_400/(Instances_400*0.25))
# Create summary statistics of other grid cells within 800m
Hash_800 <- Hash_5km %>%
filter(Distance <=800) %>%
group_by(ID) %>%
summarise(Instances_800 = n(), Count_800 = sum(Count), Calculation_800 = Count_800/(Instances_800*0.25))
# Create summary statistics of other grid cells within 1600m
Hash_1600 <- Hash_5km %>%
group_by(ID) %>%
summarise(Instances_1600 = n(), Count_1600 = sum(Count), Calculation_1600 = Count_1600/(Instances_1600*0.25))
# Join summary statistics together
Output <- Hash_200 %>%
left_join(Hash_400, by='ID') %>%
left_join(Hash_800, by='ID') %>%
left_join(Hash_1600, by='ID')
# Export output
write_csv(Output, paste0("C:/Temp/Hash/",Unique_5km_Hash[n],".csv"))
rm(Hash_5km)
rm(Hash)
rm(Hash_200)
rm(Hash_400)
rm(Hash_800)
rm(Hash_1600)
rm(Output)
invisible(gc())
}
# Read in all CSV files into single
Grid_Calculations <- list.files( path="C:/Temp/Hash/", , pattern = "*.csv", full.names=TRUE ) %>%
map_dfr(read_csv, show_col_types = FALSE)
**EDIT:** Have added set.seed() and slightly changed the start of the loop so only the relevant columns are used with 'crossing' (this makes it memory efficient for me).
**Output**
First five rows of first loop:
|ID|Instances_200|Count_200|Calculation_200|Instances_400|Count_400|Calculation_400|Instances_800|Count_800|Calculation_800|Instances_1600|Count_1600|Calculation_1600|
|--|-------------|---------|---------------|-------------|---------|---------------|-------------|---------|---------------|--------------|----------|----------------|
|SC550650NE|20|24|4.8|88|119|5.4090909090909|360|475|5.27777777777777|1503|2041|5.43180306054557|
|SC550650SW|22|28|5.09090909090909|87|119|5.47126436781609|360|475|5.27777777777777|1500|2049|5.464|
|SC550651NE|20|30|6|86|117|5.44186046511627|364|479|5.26373626373626|1494|2032|5.44042838018741|
|SC550651SE|21|30|5.71428571428571|85|113|5.31764705882352|364|479|5.26373626373626|1491|2037|5.46478873239436|
|SC550652NE|28|42|6|85|113|5.31764705882352|369|485|5.25745257452574|1487|2020|5.43375924680564|
[1]: https://stackoverflow.com/questions/72750975/fastest-way-to-get-geo-distance-for-large-dataset-in-r/72755385#72755385
</details>
# 答案2
**得分**: 1
作为一个认为一定有人比我做得更好的人,我会寻找别人制作的解决方案。
因此,我认为使用`sf`和`spdep`包有一些优势。它们都是专门为空间数据设计的。
有两个函数可以开始使用。
创建一个sf对象
```R
x <- sf::st_as_sf(Data, coords = c("X", "Y"))
距离可以使用以下方式计算
st_distance()
当然,一定会找到一种不需要计算所有距离的解决方案。一个简单的算法是将点按照所需半径的一半大小分成不同的箱子。然后,您只需要检查相邻箱子中的点。
英文:
Being the man who says there must be people who can do it better than me, I would look for solutions made by others.
So I see some advantages in using the sf
and spdep
packages. Both are explicitly designed for spatial data.
There are two functions to start with.
Create an sf-object
x <- sf::st_as_sf(Data, coords = c("X", "Y"))
and the distances could be done with the
st_distance()
Surely a solution has to be found that does not compute all the distances. A simple algorithm is to sort the points into bins of half the size of the desired radius. Then you only need to check the points in neighbouring bins.
答案3
得分: 1
使用data.table可能会有帮助,但对于这种情况,我认为计算时间仍然会非常长。也许你还是想尝试一下:
library(data.table)
tbl.data <- as.data.table(Data)
tbl.data[, Instances_200 := mapply(function (x,y) {tbl.data[X<x+200 & X > x -200
& Y < y+200 & Y > y -200, .N ]}, X, Y)]
tbl.data[, Count_200 := mapply(function (x,y) {tbl.data[X<x+200 & X > x -200
& Y < y+200 & Y > y -200, sum(Count) ]}, X, Y)]
tbl.data[, Calculation_200 := Count_200/(Instances_200*0.000625)]
等等。
英文:
Using data.table could help .. but for this case, i think the calculation time will still be far too huge ..
May be you want to try anyway :
library(data.table)
tbl.data <- as.data.table(Data)
tbl.data[, Instances_200 := mapply(function (x,y) {tbl.data[X<x+200 & X > x -200
& Y < y+200 & Y > y -200, .N ]}, X, Y)]
tbl.data[, Count_200 := mapply(function (x,y) {tbl.data[X<x+200 & X > x -200
& Y < y+200 & Y > y -200, sum(Count) ]}, X, Y)]
tbl.data[, Calculation_200 := Count_200/(Instances_200*0.000625)]
And so on.
答案4
得分: 1
我理解您需要翻译的部分是代码和相关注释。以下是已翻译的内容:
library(tictoc)
library(data.table)
# 选择数据集大小
num.rows <- 1e5
# 输入数据
tbl.data <- data.table(
X = runif(num.rows, min = 200000, max = 400000),
Y = runif(num.rows, min = 400000, max = 500000),
UID = 1:num.rows,
Count = sample(1:2000, num.rows, replace = TRUE)
)
setkey(tbl.data, UID)
# 创建超级数据堆栈
tbl.data.1600 <- copy(tbl.data)
tbl.data.0800 <- copy(tbl.data)
tbl.data.0400 <- copy(tbl.data)
tbl.data.0200 <- copy(tbl.data)
tbl.data.1600[, within_metric := 1600]
tbl.data.0800[, within_metric := 800]
tbl.data.0400[, within_metric := 400]
tbl.data.0200[, within_metric := 200]
tbl.data.1600[, group := 1]
tbl.data.0800[, group := 2]
tbl.data.0400[, group := 3]
tbl.data.0200[, group := 4]
tbl.data.stack <- copy(rbind(tbl.data.1600, tbl.data.0800, tbl.data.0400, tbl.data.0200))
setkey(tbl.data.stack, group, UID)
tbl.data.stack[, X_low_bound := X - within_metric]
tbl.data.stack[, X_upp_bound := X + within_metric]
tbl.data.stack[, Y_low_bound := Y - within_metric]
tbl.data.stack[, Y_upp_bound := Y + within_metric]
Xvec <- tbl.data.stack[group == 1]$X
Yvec <- tbl.data.stack[group == 1]$Y
Cvec <- tbl.data.stack[group == 1]$Count
# 定义函数 instances
instances <- function(x = Xvec, y = Yvec, count = Cvec, xlb, xub, ylb, yub) {
truth <- data.table::between(x, xlb, xub) & data.table::between(y, ylb, yub)
c("instances" = sum(truth), "sumCount" = sum(count * truth))
}
# 定义函数 instances_stacked
instances_stacked <- function(DT, x = Xvec, y = Yvec, count = Cvec) {
DT[, as.list(instances(xlb = X_low_bound, xub = X_upp_bound, ylb = Y_low_bound, yub = Y_upp_bound)), by = c("within_metric", "UID")]
}
# 定义函数 split_df
split_df <- function(d, var) {
base::split(d, get(var, as.environment(d)))
}
# 定义函数 foo2
foo2 <- function(dt) {
dt2 <- split_df(dt, "group_half")
require(parallel)
cl <- parallel::makeCluster(4)
print(cl)
clusterExport(cl, varlist = "instances_stacked")
clusterExport(cl, varlist = "instances")
clusterExport(cl, varlist = c("Xvec", "Yvec", "Cvec"))
clusterExport(cl, varlist = "dt2", envir = environment())
clusterEvalQ(cl, library("data.table"))
dt2 <- parallel::parLapply(cl, X = dt2, fun = instances_stacked)
parallel::stopCluster(cl)
return(rbindlist(dt2))
}
tic("使用 SOCKET 的并行计算:")
results_stacked_socket <- foo2(tbl.data.stack)
results_stacked_socket[, calculation := sumCount / (instances * 0.25)]
toc()
setkey(results_stacked_socket, UID, within_metric)
head(results_stacked_socket)
请注意,我已经为您翻译了代码和相关注释,以便更容易理解。如果您需要进一步的翻译或有其他问题,请随时提出。
英文:
I am trying to understand OP's answer post, specifically the output table.
Focusing on the first row of the output table -- I see that SC550650NE
is reported to have 20 instances of another point within 200 of (X=255085.6, Y=465070.1)
and a sum-count of 24.
> Data[ID=="SC550650NE"]
X Y UID Count ID distance
1: 255085.6 465070.1 832751 1908 SC550650NE 0.03272809
So I calculate the distance of all the points relative to (X=255085.6, Y=465070.1)
. And then display those that are within 200 in ascending order. I count 29, not 20. Also, when I sum the counts, I get 25,463, not 24.
> Data[,distance:=sqrt((X-255085.6)^2 + (Y-465070.1)^2)]
> Data[distance <= 200][order(distance)]
X Y UID Count ID distance
1: 255085.6 465070.1 832751 1908 SC550650NE 0.03272809
2: 255093.6 465124.7 1371253 1513 SC550651SE 55.16468957
3: 255034.2 465011.8 3512521 396 SC550650SW 77.73592845
4: 255049.8 464993.6 2603353 255 SC550649NW 84.47194005
5: 255091.7 465162.7 193893 1784 SC550651NE 92.82767523
6: 254968.1 465083.5 2245518 186 SC549650NE 118.25898594
7: 255180.5 464970.4 3376500 826 SC551649NE 137.66330325
8: 255148.5 464947.4 3473247 1526 SC551649SW 137.86046404
9: 255136.8 465202.7 2560816 212 SC551652SW 142.12504000
10: 255089.6 465224.3 2788575 1429 SC550652SE 154.24575868
11: 255213.6 465158.4 2051343 331 SC552651NW 155.52946932
12: 255234.1 465015.9 601896 1175 SC552650SW 158.08971475
13: 255231.4 464993.8 2562794 1209 SC552649NW 164.57602726
14: 255236.8 465003.9 2579822 624 SC552650SW 165.05736525
15: 255163.8 465219.0 2793140 491 SC551652SE 168.19129349
16: 254997.8 465224.2 3686855 1948 SC549652SE 177.38964783
17: 255055.7 464895.0 3850088 1904 SC550648NE 177.60704244
18: 254952.1 465189.1 4992168 668 SC549651NE 178.80180844
19: 255237.6 465165.5 3778801 950 SC552651NW 179.50253647
20: 255148.1 464900.4 2842521 204 SC551649SW 180.86815284
21: 255035.4 464891.2 933305 854 SC550648NW 185.79738782
22: 255245.6 465175.2 1453263 446 SC552651NW 191.44182598
23: 255227.8 465200.5 1557782 851 SC552652SW 192.90236356
24: 254894.7 465031.3 657336 1247 SC548650SE 194.79510938
25: 255102.8 464875.4 3051082 148 SC551648NW 195.41964698
26: 254914.2 464971.4 3090917 28 SC549649NW 197.75429388
27: 255030.4 464879.7 895456 109 SC550648NW 198.20424559
28: 255281.9 465101.9 3970383 1810 SC552651SE 198.85139040
29: 255108.3 465268.3 1968621 431 SC551652NW 199.51847902
X Y UID Count ID distance
> Data[distance <= 200, sum(Count)]
[1] 25463
Edit
5 million rows
So after ill-advisedly using the comments for discussion (sorry, SO!) we have a better sense of our situation. @Chris has a solution that gives an approximation but this was a concession that was being made to try and lower the computation time. During the discussion, we also learned that the distance metric may be a square or a circle, and 4 cores are available on the machine.
I will present two solutions, one for the circle metric and one for the square metric. The circle metric is faster.
Both follow the approach in this SO post.
Circle metric
- e.g.
distance = sqrt( (x1-x2)^2 + (y1-y2)^2) ) <= 200
library(tictoc)
library(data.table)
Pick one of the num.rows
. Start small and work up to get a sense of timing. I have 32GB as well (on a mac) so I'm hoping it is comparable. Roughly it seemed doubling the number of rows led to a 4x increase in computation time.
### Pick one:
###
### num.rows <- 4e4 ### 12 seconds on 4 nodes
### num.rows <- 0.078125e6 ### 43 seconds on 4 nodes
### num.rows <- 0.15625e6 ### 154 seconds on 4 nodes
### num.rows <- 0.3125e6 ### 10.5 min on 4 nodes
### num.rows <- 0.625e6 ### 44. min on 4 nodes
### num.rows <- 1.25e6 ### ~3 hours
### num.rows <- 2.5e6 ### ~12 hours
## num.rows <- 5.0e6 ### ~ 42.53 hours on 4 nodes VERIFIED.
set.seed(123)
# Input data
tbl.data <- data.table(
X=runif(num.rows, min=200000, max=400000),
Y=runif(num.rows, min=400000, max=500000),
UID = 1:num.rows,
Count = sample(1:2000,num.rows,replace=TRUE))
setkey(tbl.data, UID)
tbl.data
setkey(tbl.data, UID)
This approach depends on passing the entire X, Y, and Count columns of the dataset. We denote them Xvec
, Yvec
, and Cvec
, respectively. From there we make a function, instances()
that will first calculate the distance of 1 point (x.in, y.in)
to all the other points in (Xvec
, Yvec
), calculate the instances and do the sumCount (the density "calculation" is done at the end and doesn't need to be done within this step or be parallelized).
This function, instances()
, is made to be deployed within a data.table, and is setup to do so in the function instances.stacked()
. Since we are passing the entire entire X, Y, and Count columns of the dataset and operating row-wise in the original dataset, we can tackle the problem in parallel. If we had parallel::detectCores()
of 5 million, we'd have one core per row and be done in a jiffy. However, we have 4, so we take a moment to make a variable group_quarters
that will split the dataset into 4 chunks via foo2()
's deployment of split_df()
(both of these adapted from the example in aforementioned SO post).
Xvec <- tbl.data$X
Yvec <- tbl.data$Y
Cvec <- tbl.data$Count
## based on https://stackoverflow.com/a/44814035/2727349
instances <- function(x=Xvec,y=Yvec, count=Cvec, x.in, y.in){
calcdist <- sqrt( (x-x.in)^2+ (y-y.in)^2 )
truth_1600 <- calcdist <= 1600
result_1600 <- c("instances_1600" = sum( truth_1600 ), "sumCount_1600"= sum( Cvec*truth_1600))
truth_0800 <- calcdist <= 800
result_0800 <- c("instances_0800" = sum( truth_0800 ), "sumCount_0800"= sum( Cvec*truth_0800))
truth_0400 <- calcdist <= 400
result_0400 <- c("instances_0400" = sum( truth_0400 ), "sumCount_0400"= sum( Cvec*truth_0400))
truth_0200 <- calcdist <= 200
result_0200 <- c("instances_0200" = sum( truth_0200 ), "sumCount_0200"= sum( Cvec*truth_0200))
result=c(result_0200, result_0400, result_0800, result_1600)
return(result)
}
instances_stacked <- function(DT, x=Xvec,y=Yvec, count=Cvec){
DT[,
as.list(instances( x.in=X, y.in=Y)),
by=c("UID")
]
}
split_df <- function(d, var) {
base::split(d, get(var, as.environment(d)))
}
## define group_quarters roughly as 25% population
tbl.data[UID < 0.25 * num.rows , group_quarters:="1st q"]
tbl.data[UID < 0.50 * num.rows & UID >= 0.25 * num.rows, group_quarters:="2nd q"]
tbl.data[UID < 0.75 * num.rows & UID >= 0.50 * num.rows, group_quarters:="3rd q"]
tbl.data[ UID >= 0.75 * num.rows, group_quarters:="4th q"]
tbl.data[, uniqueN(group_quarters)]
tbl.data[, table(group_quarters)]
foo2 <- function(dt) {
dt2 <- split_df(dt, "group_quarters")
require(parallel)
cl <- parallel::makeCluster(4) ## 4 chosen based on parallel::detectCores()
print(cl)
clusterExport(cl, varlist= "instances_stacked") ## this was the `foo` in https://stackoverflow.com/a/44814035/2727349
clusterExport(cl, varlist= "instances")
clusterExport(cl, varlist= c("Xvec","Yvec","Cvec"))
clusterExport(cl, varlist= "dt2", envir = environment())
clusterEvalQ(cl, library("data.table"))
dt2 <- parallel::parLapply(cl, X= dt2, fun=instances_stacked)
parallel::stopCluster(cl)
return(rbindlist(dt2))
}
The call to foo2() will deploy the analysis in parallel and combine everything back together into results_wide
. We can then do the density calculation and view some results.
tic("foo2() runs on 4 data chunks (1/4 sized) in parallel: ")
results_wide <- foo2(tbl.data)
## next 4 lines done after the parallel part
results_wide[,calculation_1600:=sumCount_1600/(instances_1600*0.25)]
results_wide[,calculation_0800:=sumCount_0800/(instances_0800*0.25)]
results_wide[,calculation_0400:=sumCount_0400/(instances_0400*0.25)]
results_wide[,calculation_0200:=sumCount_0200/(instances_0200*0.25)]
toc()
## let's see the results
head(results_wide)
## for num.rows<-5e6,
## should match the intro example at top of the post:
results_wide[UID==832751]
Which gives (after 42 1/2 hours):
> head(results_wide)
UID instances_0200 sumCount_0200 instances_0400 sumCount_0400 instances_0800
1: 1 28 25348 115 110519 510
2: 2 38 37791 123 128978 498
3: 3 28 26958 123 131816 506
4: 4 33 33861 125 113013 514
5: 5 31 36492 123 130939 522
6: 6 37 40887 136 141213 480
sumCount_0800 instances_1600 sumCount_1600 calculation_1600 calculation_0800
1: 521004 2025 2015967 3982.157 4086.306
2: 499358 2001 1999339 3996.680 4010.908
3: 507117 2013 2003329 3980.783 4008.830
4: 513712 2065 2115524 4097.867 3997.759
5: 516877 2054 1990644 3876.619 3960.743
6: 491969 1952 2001461 4101.355 4099.742
calculation_0400 calculation_0200
1: 3844.139 3621.143
2: 4194.407 3978.000
3: 4286.699 3851.143
4: 3616.416 4104.364
5: 4258.179 4708.645
6: 4153.324 4420.216
## for num.rows <- 5e6, should match the
## intro example at top of the post for
## Data[ID=="SC550650NE"]
> results_wide[UID==832751]
UID instances_0200 sumCount_0200 instances_0400 sumCount_0400
1: 832751 29 25463 122 110993
instances_0800 sumCount_0800 instances_1600 sumCount_1600 calculation_1600
1: 480 467032 2048 2054509 4012.713
calculation_0800 calculation_0400 calculation_0200
1: 3891.933 3639.115 3512.138
Square metric
- e.g. Question Post's
(X-200, X+200) & (Y-200, Y+200)
...NOTE...I overloaded some function names so might be best to put this code in separate script and separate R session from the circle metric code
Similar idea to circle metric
approach above, but the execution is slightly different because it is based on the bounds as opposed to the distance. Basically, we make a super stack of the data and calculate the boundaries specific for each distance. These four "boundaries" make a natural candidate for chunking the data for parallelizing -- this code goes one step farther and breaks each of those 4 in half resulting in 8 groups. We still run on 4 nodes, though. I tested a bunch on 8 nodes bc my testing environment allowed for this and I developed this code before I knew OP had a 4 core environment.
library(data.table)
library(tictoc)
library(dplyr)
set.seed(123)
### Pick one:
### num.rows <- 4e4 ### 15 seconds on 8 nodes | 23 seconds on 4 nodes
num.rows <- 1e5 ### 86 seconds on 8 nodes | 140.034 seconds on 4 nodes
### num.rows <- 2e5 ### 347 seconds on 8 nodes
### num.rows <- 4e5 ### 1370 seconds on 8 nodes
### num.rows <- 8e5 ### 1370*4 = 90 minutes?
### num.rows <- 1.6e6 ### 1370*4*4 = 360 minutes = 6 hours (confirmed: 21742.233 / 60 / 60 = 6 hours)
### num.rows <- 2.5e6 ### 47523 / 60 / 60 = 13 hours and 12 minutes VERIFIED.
### num.rows <- 5.0e6 ### 4*47523 / 60 / 60 = ~53 hours estimated.
# Input data
tbl.data <- data.table(
X=runif(num.rows, min=200000, max=400000),
Y=runif(num.rows, min=400000, max=500000),
UID = 1:num.rows,
Count = sample(1:2000,num.rows,replace=TRUE))
setkey(tbl.data, UID)
tbl.data
setkey(tbl.data, UID)
Make a superstack:
tbl.data.1600 <- copy(tbl.data)
tbl.data.0800 <- copy(tbl.data)
tbl.data.0400 <- copy(tbl.data)
tbl.data.0200 <- copy(tbl.data)
tbl.data.1600[, within_metric:=1600]
tbl.data.0800[, within_metric:= 800]
tbl.data.0400[, within_metric:= 400]
tbl.data.0200[, within_metric:= 200]
tbl.data.1600[, group:=1]
tbl.data.0800[, group:=2]
tbl.data.0400[, group:=3]
tbl.data.0200[, group:=4]
tbl.data.stack <- copy(rbind(tbl.data.1600, tbl.data.0800, tbl.data.0400, tbl.data.0200))
setkey(tbl.data.stack, group, UID)
tbl.data.stack[,X_low_bound:=X-within_metric]
tbl.data.stack[,X_upp_bound:=X+within_metric]
tbl.data.stack[,Y_low_bound:=Y-within_metric]
tbl.data.stack[,Y_upp_bound:=Y+within_metric]
Xvec <- tbl.data.stack[group==1]$X
Yvec <- tbl.data.stack[group==1]$Y
Cvec <- tbl.data.stack[group==1]$Count
I include this commented chunk -- this is how one could run it without parallelizing it. Go ahead and skip for now to next block.
###################
## BEGIN ##########
###################
## no parallel. ###
## just stacked ###
###################
# instances_stacked <- function(x=Xvec,y=Yvec, count=Cvec, xlb, xub, ylb, yub){
#
# truth <- data.table::between(x, xlb, xub) & data.table::between(y, ylb, yub)
#
# c("instances" = sum( truth ), "sumCount"= sum( Cvec*truth))
#
# }
#
#
# tic("external default vector approach -- STACKED -- all 4: ")
# results_stacked <-
# tbl.data.stack[,
# as.list(instances_stacked( xlb=X_low_bound, xub=X_upp_bound, ylb=Y_low_bound, yub=Y_upp_bound)),
# by=c("group","UID")
# ]
#
# results_stacked[,calculation:=sumCount/(instances*0.25)]
# toc()
#
# head(results_stacked)
###################
## END. ##########
###################
## no parallel. ###
## just stacked ###
###################
Below is the parallel code. I broke into 8 groups, but changed the makeCluster line to 4 nodes for your computer.
###################
## BEGIN ##########
###################
## SOCKET. ###
## ROCKET. ###
###################
## follow the leader: https://stackoverflow.com/a/44814035/2727349
## step 1: change instances_stacked (which is our `foo`) so that it returns a list. Also change it to where
## it takes data table as an argument.
## step 2: use split_df -- keep in mind might need to make a concatenated grouping of "group" and "UID"
instances <- function(x=Xvec,y=Yvec, count=Cvec, xlb, xub, ylb, yub){
truth <- data.table::between(x, xlb, xub) & data.table::between(y, ylb, yub)
c("instances" = sum( truth ), "sumCount"= sum( Cvec*truth))
}
instances_stacked <- function(DT, x=Xvec,y=Yvec, count=Cvec){
DT[,
as.list(instances( xlb=X_low_bound, xub=X_upp_bound, ylb=Y_low_bound, yub=Y_upp_bound)),
by=c("within_metric","UID")
]
}
split_df <- function(d, var) {
base::split(d, get(var, as.environment(d)))
}
## define grps to be distance-bound and UID combo
tbl.data.stack[,grps:=paste0(within_metric,"_",UID)]
tbl.data.stack[, uniqueN(grps)]
## define group_half to be distance-bound and UID < 1/2 num.rows
tbl.data.stack[,group_half:=paste0(within_metric,"_", (UID < 0.5 * num.rows)+0L)]
tbl.data.stack[, uniqueN(group_half)]
foo2 <- function(dt) {
#dt2 <- split_df(dt, "grps")
#dt2 <- split_df(dt, "group")
dt2 <- split_df(dt, "group_half")
require(parallel)
##cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
#cl <- parallel::makeCluster(tbl.data.stack[, uniqueN(group_half)])
cl <- parallel::makeCluster(4)
print(cl)
clusterExport(cl, varlist= "instances_stacked") ## this was the `foo` in https://stackoverflow.com/a/44814035/2727349
clusterExport(cl, varlist= "instances")
clusterExport(cl, varlist= c("Xvec","Yvec","Cvec"))
clusterExport(cl, varlist= "dt2", envir = environment())
clusterEvalQ(cl, library("data.table"))
dt2 <- parallel::parLapply(cl, X= dt2, fun=instances_stacked)
parallel::stopCluster(cl)
return(rbindlist(dt2))
}
tic("external default vector approach -- SOCKET -- all 4 in parallel: ")
results_stacked_socket <- foo2(tbl.data.stack)
results_stacked_socket[,calculation:=sumCount/(instances*0.25)]
toc()
setkey(results_stacked_socket, UID, within_metric)
head(results_stacked_socket)
###################
## END. ##########
###################
## SOCKET. ###
## ROCKET. ###
###################
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论