--- title: "Prototyping Distributed Computations" author: "Balasubramanian Narasimhan" date: '`r Sys.Date()`' output: html_document: fig_caption: yes theme: cerulean toc: yes toc_depth: 2 vignette: > %\VignetteIndexEntry{Prototyping Distributed Computations} %\VignetteEngine{knitr::rmarkdown} \usepackage[utf8]{inputenc} --- ```{r echo=FALSE} knitr::opts_chunk$set( message = FALSE, warning = FALSE, error = FALSE, tidy = FALSE, cache = FALSE ) suppressMessages(suppressWarnings(library(distcomp))) dcWorkspace <- tempdir() ignore <- distcompSetup(workspace = dcWorkspace, ssl_verifyhost = 0L, ssl_verifypeer = 0L) ``` ## Introduction The `distcomp` package allows one to perform computations on data that is row-partitioned data among several sites. Implemented methods include stratified Cox regression and singular value decomposition (SVD). The distributed computations rely on a REST API for R, such as that provided by [Opencpu](https://www.opencpu.org) for example. In order to prototype new methods for `distcomp`, it is convenient to write code that does all computations locally, at least during the development phase. This allows for easy debugging and code changes. The step to using a REST call is merely one step off. For example, in the context of distributed multiple imputation, the distributed SVD is a building block. So one has to be able to be able to run the svd operation not only in a modular fashion, but also quickly to test code. Previous versions of `distcomp` assumed all computations were done using `opencpu` remote calls. This meant that the results were time-consuming to check; for reasonably sized problems one would have to wait for hours. Version 1.1 of `distcomp` addresses this by allowing local computations, in essence _faking_ the `opencpu` calls. All calculations are done locally at normal speeds (rather than a https `POST` request) while ensuring that the sites are only privy to their own data. ## An Example ```{r} available <- availableComputations() ( computationNames <- names(available) ) ``` We will perform an SVD on a 1500 by 20 matrix where the matrix is distributed across three sites each having a 500 by 20 matrix. We will extract the first 10 components using this distributed algorithm. We first define the computation. ```{r} svdDef <- data.frame(compType = "RankKSVD", rank = 20L, ncol = 20L, id = "SVD", stringsAsFactors = FALSE) ``` We generate some synthetic data. ```{r} set.seed(12345) ## Three sites nSites <- 3 siteData <- lapply(seq.int(nSites), function(i) matrix(rnorm(10000), ncol=20)) ``` We now create local objects to represent the sites, a list of three items, each with a name and its specific data. ```{r} sites <- lapply(seq.int(nSites), function(x) list(name = paste0("site", x), worker = makeWorker(defn = svdDef, data = siteData[[x]]) )) ``` We are now ready to create a master object and add sites to it. ```{r} master <- makeMaster(svdDef) for (site in sites) { master$addSite(name = site$name, worker = site$worker) } ``` When any of the sites specify a `worker` argument in the call to `addSite`, it is an indication that the computation is to be performed locally. Otherwise, a URL will have to be specified using the `url` argument to `addSite`. (Only one of `worker` or `url` can be specified.) Finally, we are ready to run the decomposition, specifying a maximum number of iterations, so that things don't go haywire. This takes less than 5 seconds on my laptop. ```{r} system.time(result <- master$run(max.iter = 10000)) ``` The result is a named list of two things, the $d$ diagonal values and the $v$ matrix. We can compare the results obtained here with the actual. ```{r} full_data <- do.call(rbind, siteData) full_svd <- svd(full_data) ``` Let's print it side-by-side. ```{r} d_table <- data.frame(truth = full_svd$d, distcomp = result$d) knitr::kable(d_table) ``` We can also compare the $v$ matrix, taking into account the signs may be different. ```{r} norm(abs(result$v) - abs(full_svd$v), "F") ``` ## Stratified Cox Model The approach for the stratified Cox model is similar. We start with a definition. ```{r} coxDef <- data.frame(compType = "StratifiedCoxModel", formula = "Surv(time, censor) ~ age + becktota + ndrugfp1 + ndrugfp2 + ivhx3 + race + treat", projectName = "STCoxTest", projectDesc = "STCox Project Desc", stringsAsFactors = FALSE) ``` We now create two sites with appropriate data ```{r} ## Two sites siteDataFiles <- file.path(system.file("ex", package="distcomp"), c("uis-site1.csv", "uis-site2.csv")) siteData <- lapply(siteDataFiles, read.csv) ``` We now create local objects to represent the two sites, each with a name and its specific data. ```{r} sites <- lapply(seq_along(siteData), function(x) list(name = paste0("site", x), worker = makeWorker(defn = coxDef, data = siteData[[x]]) )) ``` We are now ready to create a master object and add sites to it. ```{r} master <- makeMaster(coxDef) for (site in sites) { master$addSite(name = site$name, worker = site$worker) } ``` Now we can run the model. ```{r, } result <- master$run() print(master$summary(), digits = 4) ``` The above result will be the same as what we obtain from a Cox regression fit. ```{r} coxFit <- survival::coxph(formula=Surv(time, censor) ~ age + becktota + ndrugfp1 + ndrugfp2 + ivhx3 + race + treat + strata(site), data = rbind(siteData[[1]], siteData[[2]])) summary(coxFit)$coefficients ``` ## Session Info ```{r} sessionInfo() ```