Mutexes in R Programming Language
In the previous example we saw how to manage simple counter state using atomic operations. For more complex state we can use a mutex to safely access data across multiple threads.
library(parallel)
# Container holds a list of counters; since we want to
# update it concurrently from multiple threads, we
# add a mutex to synchronize access.
Container <- setRefClass("Container",
fields = list(
mu = "environment",
counters = "list"
),
methods = list(
initialize = function() {
mu <<- new.env()
counters <<- list(a = 0, b = 0)
},
inc = function(name) {
# Lock the mutex before accessing counters; unlock
# it at the end of the function using on.exit()
lockEnvironment(mu)
on.exit(unlockEnvironment(mu))
counters[[name]] <<- counters[[name]] + 1
}
)
)
# Note that we need to initialize the mutex explicitly in R
c <- Container$new()
# This function increments a named counter in a loop
doIncrement <- function(container, name, n) {
for (i in 1:n) {
container$inc(name)
}
}
# Run several threads concurrently; note
# that they all access the same Container,
# and two of them access the same counter.
cl <- makeCluster(3)
clusterExport(cl, c("Container", "doIncrement", "c"))
results <- parLapply(cl, list(
list(name = "a", n = 10000),
list(name = "a", n = 10000),
list(name = "b", n = 10000)
), function(args) {
doIncrement(c, args$name, args$n)
})
stopCluster(cl)
# Print the final state of the counters
print(c$counters)Running the program shows that the counters updated as expected.
$ Rscript mutexes.R
$a
[1] 20000
$b
[1] 10000Next we’ll look at implementing this same state management task using only threads and message passing.
In this R implementation:
We use the
parallelpackage for concurrent execution.The
Containerclass is implemented as an R reference class, which allows for mutable state.The mutex is implemented using an environment and the
lockEnvironment()andunlockEnvironment()functions.We use
parLapply()to run the increment operations concurrently across multiple threads.The
clusterExport()function is used to make necessary objects available to all threads.
This implementation provides a similar level of thread-safe concurrent access to shared state as the original example, adapted to R’s concurrency model and syntax.