这个例子来源于scala圣经级教程《Functional Programming in Scala》,由于本人跟着书中的代码敲了一遍,然后写了点测试代码验证了一下正确性,所以就放在这做个备忘吧。贴出来只是为了方便自己,如果看不懂,但是又感兴趣的就去看原书吧……
> 注:这个并发库使用的线程池如果只有唯一一条工作线程的话,会导致线程阻塞,可以参考main方法中的示例,阻塞原因与代码中fork的实现细节有关,所以,本人会在下一篇文章提供一个非阻塞的版本…..
package parallelism
import java.util.concurrent._
object Par {
type Par[A] = ExecutorService => Future[A]
def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)
/**
* `unit` is represented as a function that returns a `UnitFuture`,
* which is a simple implementation of `Future` that just wraps a constant value.
* It doesn't use the `ExecutorService` at all. It's always done and can't be cancelled.
* Its `get` method simply returns the value that we gave it.
*/
def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a)
private case class UnitFuture[A](get: A) extends Future[A] {
override def cancel(mayInterruptIfRunning: Boolean) =
false
override def isCancelled =
false
override def isDone =
true
override def get(timeout: Long, unit: TimeUnit) = get
}
/**
* `map2` doesn't evaluate the call to `f` in a separate logical thread,
* in accord with our design choice of having `fork` be the sole function in the API for controlling parallelism.
* We can always do `fork(map2(a,b)(f))` if we want the evaluation of `f` to occur in a separate thread.
*/
def map2[A, B, C](a: Par[A], b: Par[B])(f: (A, B) => C): Par[C] = (es: ExecutorService) => {
val af = a(es)
val bf = b(es)
UnitFuture(f(af.get, bf.get))
}
/**
* This is the simplest and most natural implementation of `fork`,
* but there are some problems with it--for one, the outer `Callable` will block waiting for the "inner" task to complete.
* Since this blocking occupies a thread in our thread pool, or whatever resource backs the `ExecutorService`,
* this implies that we're losing out on some potential parallelism. Essentially, we're using two threads when one should suffice.
* This is a symptom of a more serious problem with the implementation, and we will discuss this later in the chapter.
*/
def fork[A](a: => Par[A]): Par[A] = es => es.submit(
new Callable[A] {
override def call() = a(es).get
})
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
def asyncF[A, B](f: A => B): A => Par[B] = a => lazyUnit(f(a))
def map[A, B](pa: Par[A])(f: A => B): Par[B] = map2(pa, unit(()))((a, _) => f(a))
def sortPar(parList: Par[List[Int]]) = map(parList)(_.sorted)
def sequence_simple[A](l: List[Par[A]]): Par[List[A]] = l.foldRight[Par[List[A]]](unit(List[A]()))((h, t) => map2(h, t)(_ :: _))
/**
* This implementation forks the recursive step off to a new logical thread,
* making it effectively tail-recursive. However, we are constructing
* a right-nested parallel program, and we can get better performance by
* dividing the list in half, and running both halves in parallel.
* See `sequenceBalanced` below.*/
def sequenceRight[A](as: List[Par[A]]): Par[List[A]] = as match {
case Nil => unit(Nil)
case h :: t => map2(h, fork(sequenceRight(t)))(_ :: _)
}
/**
* We define `sequenceBalanced` using `IndexedSeq`, which provides an
* efficient function for splitting the sequence in half.*/
def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork {
if (as.isEmpty) unit(Vector())
else if (as.length ==
1) map(as.head)(a => Vector(a))
else {
val (l, r) = as.splitAt(as.length /
2)
map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _)
}
}
def sequence[A](as: List[Par[A]]): Par[List[A]] = map(sequenceBalanced(as.toIndexedSeq))(_.toList)
def parFilter[A](l: List[A])(f: A => Boolean): Par[List[A]] = {
val pars: List[Par[List[A]]] = l map (asyncF((a: A) =>
if (f(a)) List(a)
else List()))
map(sequence(pars))(_.flatten)
}
def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean = p(e).get == p2(e).get
def delay[A](fa: => Par[A]): Par[A] = es => fa(es)
def choice[A](cond: Par[Boolean])(t: Par[A], f: Par[A]): Par[A] = es =>
if (run(es)(cond).get) t(es)
else f(es)
def choiceN[A](n: Par[Int])(choices: List[Par[A]]): Par[A] =
es => {
val ind = run(es)(n).get
run(es)(choices(ind))
}
def choiceViaChoiceN[A](a: Par[Boolean])(ifTrue: Par[A])(ifFalse: Par[A]): Par[A] = choiceN(map(a)(b =>
if (b)
0 else 1))(List(ifTrue, ifFalse))
def choiceMap[K, V](key: Par[K])(choices: Map[K, Par[V]]): Par[V] =
es => {
val kVal = run(es)(key).get
run(es)(choices(kVal))
}
def chooser[A, B](p: Par[A])(choices: A => Par[B]): Par[B] =
es => {
val k = run(es)(p).get
run(es)(choices(k))
}
/** `chooser` is usually called `flatMap` or `bind`.
* and you will find, flatMap is a higher abstract for chooser/choiceMap/choice/choiceN
*/
def flatMap[A, B](p: Par[A])(choices: A => Par[B]): Par[B] =
es => {
val k = run(es)(p).get
run(es)(choices(k))
}
def choiceViaFlatMap[A](p: Par[Boolean])(f: Par[A], t: Par[A]): Par[A] = flatMap(p)(b =>
if (b) f
else t)
def choiceNViaFlatMap[A](p: Par[Int])(choices: List[Par[A]]): Par[A] = flatMap(p)(i => choices(i))
def joinViaFlatMap[A](pp: Par[Par[A]]): Par[A] = flatMap(pp)(p => p)
def join[A](a: Par[Par[A]]): Par[A] = es => run(es)(run(es)(a).get())
def flatMapViaJoin[A, B](p: Par[A])(f: A => Par[B]): Par[B] = join(map(p)(f))
implicit def toParOps[A](p: Par[A]): ParOps[A] =
new ParOps(p)
class ParOps[A](p: Par[A]) {}
}
object Examples {
import Par._
def sum(ints: IndexedSeq[Int]): Int =
if (ints.size <=
1) ints.headOption getOrElse
0 else {
val (l, r) = ints.splitAt(ints.length /
2)
sum(l) + sum(r)
}
def main(args: Array[String]): Unit = {
println(sum(Range(
0,
10)))
val es = Executors.newFixedThreadPool(
2)
val parInt: Par[Int] = es => es.submit(
new Callable[Int] {
override def call() = ((System.currentTimeMillis +
1.0) / System.currentTimeMillis) toInt
})
val parString: Par[String] = es => es.submit(
new Callable[String] {
override def call() =
" |<*>| " *
3
})
val parIntMap = Par.map(parInt)((a: Int) => a *
8)
val parMap2 = Par.map2[Int, String, (Int, String)](parInt, parString)((_ -> _))
println(run(es)(parInt).get)
println(run(es)(parIntMap).get)
println(run(es)(parMap2).get)
println(run(es)(fork(parMap2)).get)
es shutdown
val singleThreadEs = Executors.newFixedThreadPool(
1)
println(run(singleThreadEs)(fork(parMap2)).get)
singleThreadEs shutdown
}
}
上述代码的运行结果是:
45
1
8
(
1, |<*>| |<*>| |<*>| )
(
1, |<*>| |<*>| |<*>| )
879675643@qq.com lhever
.---.
| | . __.....__ .----. .----. __.....__
| | .
'| .-'' '. \ \ / /.-
'' '.
| |< | / .-''"'-. `. ' '. /' // .-''"'-. `. .-,.--.
| | | | / /________\ \| |
' // /________\ \| .-. |
| | | | .'''-. | || || || || | | |
| | | |/.
'''. \\ .-------------''. `' .
'\ .-------------'| | | |
| | | / | | \
'-.____...---. \ / \ '-.____
...---.| |
'-
| | | | | | `. .' \ / `. .' | |
'---' | | | | `
''-...... -
' '----
' `''-...... -' | |
|
'. | '. |_|
'---' '---'