Prototyping Distributed Computations

Introduction

The distcomp package allows one to perform computations on data that is row-partitioned data among several sites. Implemented methods include stratified Cox regression and singular value decomposition (SVD). The distributed computations rely on a REST API for R, such as that provided by Opencpu for example.

In order to prototype new methods for distcomp, it is convenient to write code that does all computations locally, at least during the development phase. This allows for easy debugging and code changes. The step to using a REST call is merely one step off. For example, in the context of distributed multiple imputation, the distributed SVD is a building block. So one has to be able to be able to run the svd operation not only in a modular fashion, but also quickly to test code.

Previous versions of distcomp assumed all computations were done using opencpu remote calls. This meant that the results were time-consuming to check; for reasonably sized problems one would have to wait for hours.

Version 1.1 of distcomp addresses this by allowing local computations, in essence faking the opencpu calls. All calculations are done locally at normal speeds (rather than a https POST request) while ensuring that the sites are only privy to their own data.

An Example

available <- availableComputations()
( computationNames <- names(available) )
## [1] "QueryCount"         "StratifiedCoxModel" "RankKSVD"

We will perform an SVD on a 1500 by 20 matrix where the matrix is distributed across three sites each having a 500 by 20 matrix. We will extract the first 10 components using this distributed algorithm.

We first define the computation.

svdDef <- data.frame(compType = "RankKSVD",
                     rank = 20L,
                     ncol = 20L,
                     id = "SVD",
                     stringsAsFactors = FALSE)

We generate some synthetic data.

set.seed(12345)
## Three sites
nSites <- 3
siteData <- lapply(seq.int(nSites), function(i) matrix(rnorm(10000), ncol=20))

We now create local objects to represent the sites, a list of three items, each with a name and its specific data.

sites <- lapply(seq.int(nSites),
                function(x) list(name = paste0("site", x),
                                 worker = makeWorker(defn = svdDef, data = siteData[[x]])
                                 ))

We are now ready to create a master object and add sites to it.

master <- makeMaster(svdDef)
for (site in sites) {
  master$addSite(name = site$name, worker = site$worker)
}

When any of the sites specify a worker argument in the call to addSite, it is an indication that the computation is to be performed locally. Otherwise, a URL will have to be specified using the url argument to addSite. (Only one of worker or url can be specified.)

Finally, we are ready to run the decomposition, specifying a maximum number of iterations, so that things don’t go haywire. This takes less than 5 seconds on my laptop.

system.time(result <- master$run(max.iter = 10000))
##    user  system elapsed 
##   7.840  12.968   5.256

The result is a named list of two things, the d diagonal values and the v matrix.

We can compare the results obtained here with the actual.

full_data <- do.call(rbind, siteData)
full_svd <- svd(full_data)

Let’s print it side-by-side.

d_table <- data.frame(truth = full_svd$d, distcomp = result$d)
knitr::kable(d_table)
truth distcomp
41.82263 41.82263
41.65242 41.65242
41.32027 41.32027
40.88629 40.88629
40.41849 40.41849
40.27631 40.27631
40.12698 40.12698
39.44775 39.44775
39.24815 39.24815
39.07074 39.07074
38.37076 38.37076
38.21350 38.21350
37.61047 37.61047
37.14286 37.14286
36.98763 36.98763
36.24778 36.24778
36.04962 36.04962
35.66050 35.66050
35.21343 35.21343
34.56329 34.56329

We can also compare the v matrix, taking into account the signs may be different.

norm(abs(result$v) - abs(full_svd$v), "F")
## [1] 1.124008e-05

Stratified Cox Model

The approach for the stratified Cox model is similar. We start with a definition.

coxDef <- data.frame(compType = "StratifiedCoxModel",
                     formula = "Surv(time, censor) ~ age + becktota + ndrugfp1 + ndrugfp2 + ivhx3 + race + treat",
                     projectName = "STCoxTest",
                     projectDesc = "STCox Project Desc",
                     stringsAsFactors = FALSE)

We now create two sites with appropriate data

## Two sites
siteDataFiles <- file.path(system.file("ex", package="distcomp"), c("uis-site1.csv", "uis-site2.csv"))
siteData <- lapply(siteDataFiles, read.csv)

We now create local objects to represent the two sites, each with a name and its specific data.

sites <- lapply(seq_along(siteData),
                function(x) list(name = paste0("site", x),
                                 worker = makeWorker(defn = coxDef, data = siteData[[x]])
                                 ))

We are now ready to create a master object and add sites to it.

master <- makeMaster(coxDef)
for (site in sites) {
  master$addSite(name = site$name, worker = site$worker)
}

Now we can run the model.

result <- master$run()
print(master$summary(), digits = 4)
##        coef exp(coef) se(coef)      z         p
## 1 -0.028076    0.9723 0.008131 -3.453 5.542e-04
## 2  0.009146    1.0092 0.004991  1.832 6.691e-02
## 3 -0.521973    0.5933 0.124424 -4.195 2.727e-05
## 4 -0.194178    0.8235 0.048252 -4.024 5.717e-05
## 5  0.263634    1.3017 0.108243  2.436 1.487e-02
## 6 -0.240021    0.7866 0.115632 -2.076 3.792e-02
## 7 -0.212616    0.8085 0.093747 -2.268 2.333e-02

The above result will be the same as what we obtain from a Cox regression fit.

coxFit <- survival::coxph(formula=Surv(time, censor) ~ age + becktota + ndrugfp1 + ndrugfp2 +
                              ivhx3 + race + treat + strata(site),
                          data = rbind(siteData[[1]], siteData[[2]]))
summary(coxFit)$coefficients
##                   coef exp(coef)    se(coef)         z     Pr(>|z|)
## age       -0.028075893 0.9723146 0.008130685 -3.453078 5.542280e-04
## becktota   0.009145528 1.0091875 0.004991421  1.832250 6.691425e-02
## ndrugfp1  -0.521973047 0.5933487 0.124423881 -4.195119 2.727278e-05
## ndrugfp2  -0.194177573 0.8235117 0.048252289 -4.024215 5.716573e-05
## ivhx3TRUE  0.263634281 1.3016521 0.108243388  2.435569 1.486837e-02
## race      -0.240020862 0.7866115 0.115632433 -2.075723 3.791961e-02
## treat     -0.212616368 0.8084662 0.093747124 -2.267978 2.333058e-02

Session Info

sessionInfo()
## R version 4.4.1 (2024-06-14)
## Platform: x86_64-pc-linux-gnu
## Running under: Ubuntu 24.04.1 LTS
## 
## Matrix products: default
## BLAS:   /usr/lib/x86_64-linux-gnu/openblas-pthread/libblas.so.3 
## LAPACK: /usr/lib/x86_64-linux-gnu/openblas-pthread/libopenblasp-r0.3.26.so;  LAPACK version 3.12.0
## 
## locale:
##  [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
##  [3] LC_TIME=en_US.UTF-8        LC_COLLATE=C              
##  [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
##  [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
##  [9] LC_ADDRESS=C               LC_TELEPHONE=C            
## [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       
## 
## time zone: Etc/UTC
## tzcode source: system (glibc)
## 
## attached base packages:
## [1] stats     graphics  grDevices utils     datasets  methods   base     
## 
## other attached packages:
## [1] distcomp_1.3-4 survival_3.7-0
## 
## loaded via a namespace (and not attached):
##  [1] Matrix_1.7-1      jsonlite_1.8.9    dplyr_1.1.4       compiler_4.4.1   
##  [5] promises_1.3.0    tidyselect_1.2.1  Rcpp_1.0.13       stringr_1.5.1    
##  [9] later_1.3.2       jquerylib_0.1.4   splines_4.4.1     yaml_2.3.10      
## [13] fastmap_1.2.0     sodium_1.3.2      mime_0.12         lattice_0.22-6   
## [17] R6_2.5.1          generics_0.1.3    knitr_1.48        tibble_3.2.1     
## [21] maketools_1.3.1   shiny_1.9.1       bslib_0.8.0       pillar_1.9.0     
## [25] rlang_1.1.4       utf8_1.2.4        stringi_1.8.4     cachem_1.1.0     
## [29] httpuv_1.6.15     xfun_0.48         sass_0.4.9        sys_3.4.3        
## [33] cli_3.6.3         magrittr_2.0.3    digest_0.6.37     grid_4.4.1       
## [37] xtable_1.8-4      gmp_0.7-5         lifecycle_1.0.4   vctrs_0.6.5      
## [41] evaluate_1.0.1    glue_1.8.0        homomorpheR_0.2-5 buildtools_1.0.0 
## [45] fansi_1.0.6       httr_1.4.7        rmarkdown_2.28    pkgconfig_2.0.3  
## [49] tools_4.4.1       htmltools_0.5.8.1