Don’t get me wrong. Concurrency is already “groovy” in Groovy but with each passing day we usualy try to focus on even higher and higher levels of abstraction. From GPars to Rx we’re treating concurrency as a byproduct of our paradigms so usualy when things got ugly (as usualy happens with concurrency) we don’t have a “Plan B”.
In this talk I’ll try to ground some of the possible flavors of concurrency that we have available inside the Groovy language and the JVM ecosystem starting with threads and going through higher levels like Rx and even Akka.
28. Some people, when confronted with a problem,
think "I know, I'll use regular expressions."
Now they have two problems.
- Jamie Zawinski
CONCURRENCY
8 concurrently
Alonso Torres
35. Commutative
We don’t care about the order
Associative
Split and combine the tasks
d * c * b * a =
(a * b) * (c * d) =
a * b * c * d =
Binary operation
(Int, Int) → Int
36. def factorial(def num) {
def result = 1g
(1..num).each {
result = result * it
}
return result
}
BigInteger
literals
42. ○ Basic unit of concurrency for the JVM
○ Managed by the OS
○ There is always at least 1 thread (main thread)
Threads
43. def factorial(def num) {
def result = 1g
def ts = []
(1 .. num).each { n ->
ts << Thread.start {
result = result * n
}
}
ts*.join()
return result
}
Thread creation
Wait to finish
Store the threads
56. def factorial(def num) {
def result = new AtomicReference(1g)
def ts = []
(1 .. num).each { n ->
ts << Thread.start {
result = result * n
}
}
ts*.join()
return result.get()
}
No locking!!
just a bit of Groovy...
Concurrent util
58. AtomicReference.metaClass.multiply = { val ->
def old = delegate.get()
while(!delegate.compareAndSet(old, old * val)) {
old = delegate.get()
}
return delegate
}
Add multiplication
to metaclass
Retries until it’s
allowed to change
59. def factorial(def num) {
def result = new AtomicReference(1g)
def ts = []
(1 .. num).each { n ->
ts << Thread.start {
result = result * n
}
}
ts*.join()
return result.get()
}
Safe write to atomic
variable
61. def factorial(def num) {
def result = new AtomicReference(1g)
def ts = []
(1 .. num).each { n ->
ts << Thread.start {
result = result * n
}
}
ts*.join()
return result.get()
}
Potentially thousands of
threads
62. def factorial(def num) {
def result = new AtomicReference(1g)
def threadPool = Executors.newFixedThreadPool(10)
def fs = []
(1 .. num).each { n ->
fs << threadPool.submit {
result = result * n
}
}
fs*.get()
return result.get()
}
The thread pool will
reuse system threads
when we’re done
We’re sending a task that
will be completed
eventually
67. def product(from, to) {
def result = 1g
(from .. to).each { n ->
result = result * n
}
return result
}
68. int batches = batchesForNum(number)
(0 ..< batches).each { batch ->
fs << pool.submit {
def from = batchFrom(number, batch)
def to = batchTo(number, batch)
def current = product(from, to)
result = result * current
}
}
Divide into batches
Each batch will be a Thread
“product” is serial
71. Shared mutable state
○ The threads are competing to write/read
○ Mutex = “safe zone” for accessing
○ Non-determinism, performance problems
○ Memory-wise is good
76. def factorial(number) {
(1g .. number)
.stream()
.parallel()
.reduce { a, b -> a * b }
.get()
}
Create a stream from a
range
Do it parallel
We can reduce because
it’s an associative function
77. Function style programming
○ You have to get used to it
○ Harder to distribute (wait for it…)
○ Higher level means less flexibility
85. def factorial(def num) {
def result = new AtomicReference(1g)
GParsPool.withPool(10) {
(1..num).eachParallel {
result = result * it
}
}
return result.get()
}
Parallel collections
Shared state :(((
Abstraction over thread
pool
98. def factorial(def num) {
def result = 0
def coordinator = actor {
spawnCalculator().send([1g, num])
react {
result = msg
}
}
coordinator.join()
return result
}
Owned by the “coordinator”
Starts a calculator with the
whole calculation
Waits for the answer
Synchronizes with the main
thread
99. def spawnCalculator() {
actor {
react { msg ->
def (from, to) = msg
def origin = sender
if (to - from < 1000) {
reply product(from, to)
} else {
....
}
}
}
}
Waits for petitions
Serial “base” case
Next slide
100. def half = from + ((to - from) / 2) as BigInteger
def child1 = spawnCalculator()
def child2 = spawnCalculator()
child1.send([from, half])
child2.send([half+1, to])
react { a ->
react { b ->
origin.send(a * b)
}
}
Splits in half and
“delegates” to its children
Waits for the children
response
105. Clojure
○ Functional language for the JVM
○ Great data structures with concurrency in mind
○ But… at the end of the day is still bytecode
106. def factorial(def num) {
def result = new Atom(1g)
GParsPool.withPool(10) {
(1..num).eachParallel {
result.swap({
value -> value * current
} as IFn)
}
}
return result.deref()
}
The atom “owns” the
value
We send the operation we
want to use as a function
Coerce Clojure’s function
117. ○ Resilient Distributed Dataset
○ Abstraction to work with distributed collections
○ “Spark’s streams”
RDD’s
118. def conf = new SparkConf()
.setMaster("local[8]")
.setAppName("FactorialSpark")
this.sparkContext = new JavaSparkContext(conf)
Set the master node
(here local)
119. BigInteger calculate(BigInteger number) {
def result = this.sparkContext
.parallelize(1g .. number)
.reduce({ a, b -> a * b }.dehydrate())
return result
}
Create a RDD with our set
Same as with streams
“dehydrate” needed to
serialize the closure
125. void "BlockingVariable Sample"() {
given:
def isDone = new BlockingVariable()
and:
def uploader = new UploaderService("file") {
void setIsDone(boolean v) {
isDone.set(v)
}
}
when:
uploader.start()
then:
isDone.get()
}
Will set the blocking
variable when is done
Will block until finished
138. Attributions
Pencil icon: Created by Souvik Bhattacharjeefrom the Noun Project
Thief icon: Created by Gregor Cresnar the Noun Project
Block icon: Created by mikicon the Noun Project
Checklist icon: Created by Delwar Hossain the Noun Project
Gui icon: Created by Ralf Schmitzer the Noun Project