第 9 章 并行编程

在本章中,我们将介绍 OCaml 中的并行编程功能。OCaml 标准库提供了用于并行编程的底层原语。我们建议用户使用更高层的并行编程库,例如 domainslib。本教程将首先介绍使用 domainslib 进行的高级并行编程,然后介绍编译器提供的底层原语。

OCaml 区分并发和并行,并提供不同的机制来表达它们。并发是任务的重叠执行(第 12.24.2 节),而并行是任务的同时执行。特别是,并行任务在时间上重叠,而并发任务可能在时间上重叠,也可能不重叠。任务可以通过互相让步来并发执行。并发是一种程序结构机制,而并行是一种使程序运行更快的机制。如果您对 OCaml 中的并发编程机制感兴趣,请参阅有关效果处理程序的第 12.24 章和有关线程库的第 34 章。

1

域是 OCaml 中的并行单位。模块 Domain 提供了创建和管理域的原语。可以使用 spawn 函数生成新的域。

Domain.spawn (fun _ -> print_endline "I ran in parallel")
I ran in parallel - : unit Domain.t = <abstr>

spawn 函数并行地执行给定计算,与调用域并行。

域是重量级的实体。每个域都与一个操作系统线程一一对应。每个域也有自己的运行时状态,包括用于分配内存的域局部结构。因此,创建和销毁它们比较昂贵。

建议程序不要生成超过可用内核数的域。.

在本教程中,我们将实现、运行和衡量并行程序的性能。观察到的结果取决于目标机器上可用的内核数量。本教程是在配备 4 个内核和 8 个硬件线程的 2.3 GHz 四核英特尔酷睿 i7 MacBook Pro 上编写的。对于并行程序,如果域之间的协调很少,并且机器没有处于负载状态,那么在 4 个域上大约可以获得 4 倍的性能。在 4 个域之外,加速很可能低于线性。我们还将使用命令行基准测试工具 hyperfine 来基准测试我们的程序。

1.1 加入域

我们将使用该程序通过递归来计算第 n 个斐波那契数作为运行示例。用于计算第 n 个斐波那契数的顺序程序如下所示。

(* fib.ml *) let n = try int_of_string Sys.argv.(1) with _ -> 1 let rec fib n = if n < 2 then 1 else fib (n - 1) + fib (n - 2) let main () = let r = fib n in Printf.printf "fib(%d) = %d\n%!" n r let _ = main ()

该程序可以编译和基准测试,如下所示。

$ ocamlopt -o fib.exe fib.ml
$ ./fib.exe 42
fib(42) = 433494437
$ hyperfine './fib.exe 42' # Benchmarking
Benchmark 1: ./fib.exe 42
  Time (mean ± sd):     1.193 s ±  0.006 s    [User: 1.186 s, System: 0.003 s]
  Range (min … max):    1.181 s …  1.202 s    10 runs

我们看到,计算第 42 个斐波那契数大约需要 1.2 秒。

可以使用 join 函数加入生成的域以获取其结果。 join 函数等待目标域终止。以下程序并行地计算两次第 n 个斐波那契数。

(* fib_twice.ml *) let n = int_of_string Sys.argv.(1) let rec fib n = if n < 2 then 1 else fib (n - 1) + fib (n - 2) let main () = let d1 = Domain.spawn (fun _ -> fib n) in let d2 = Domain.spawn (fun _ -> fib n) in let r1 = Domain.join d1 in Printf.printf "fib(%d) = %d\n%!" n r1; let r2 = Domain.join d2 in Printf.printf "fib(%d) = %d\n%!" n r2 let _ = main ()

该程序生成了两个域,它们计算第 n 个斐波那契数。 spawn 函数返回一个 Domain.t 值,可以加入它来获取并行计算的结果。 join 函数在计算完成之前一直阻塞。

$ ocamlopt -o fib_twice.exe fib_twice.ml
$ ./fib_twice.exe 42
fib(42) = 433494437
fib(42) = 433494437
$ hyperfine './fib_twice.exe 42'
Benchmark 1: ./fib_twice.exe 42
  Time (mean ± sd):     1.249 s ±  0.025 s    [User: 2.451 s, System: 0.012 s]
  Range (min … max):    1.221 s …  1.290 s    10 runs

可以看出,由于并行性,计算两次第 n 个斐波那契数的时间几乎与计算一次的时间相同。

2 Domainslib:用于嵌套并行编程的库

让我们尝试将斐波那契函数并行化。两个递归调用可以并行执行。但是,通过为每个递归调用生成域来简单地并行化递归调用将无法正常工作,因为它会生成太多域。

(* fib_par1.ml *) let n = try int_of_string Sys.argv.(1) with _ -> 1 let rec fib n = if n < 2 then 1 else begin let d1 = Domain.spawn (fun _ -> fib (n - 1)) in let d2 = Domain.spawn (fun _ -> fib (n - 2)) in Domain.join d1 + Domain.join d2 end let main () = let r = fib n in Printf.printf "fib(%d) = %d\n%!" n r let _ = main ()
fib(1) = 1 val n : int = 1 val fib : int -> int = <fun> val main : unit -> unit = <fun>
$ ocamlopt -o fib_par1.exe fib_par1.ml
$ ./fib_par1.exe 42
Fatal error: exception Failure("failed to allocate domain")

OCaml 同时只能有 128 个域处于活动状态。尝试生成更多域将引发异常。那么,我们该如何并行化斐波那契函数呢?

2.1 使用 domainslib 并行化斐波那契

OCaml 标准库只提供并发和并行编程的底层原语,将更高层的编程库留待在核心编译器发行版之外进行开发和分发。 Domainslib 就是这样一个用于嵌套并行编程的库,这在递归斐波那契计算中可用的并行性中得到了体现。让我们使用 domainslib 来并行化递归斐波那契程序。建议您使用 opam 包管理器安装 domainslib。本教程使用 domainslib 版本 0.5.0。

Domainslib 提供了一个 async/await 机制,用于生成并行任务和等待其结果。在该机制的基础上,domainslib 提供了并行迭代器。在其核心,domainslib 有一个高效的工作窃取队列实现,以便高效地与其他域共享任务。并行斐波那契程序的实现如下所示。

(* fib_par2.ml *)
let num_domains = int_of_string Sys.argv.(1)
let n = int_of_string Sys.argv.(2)

let rec fib n = if n < 2 then 1 else fib (n - 1) + fib (n - 2)

module T = Domainslib.Task

let rec fib_par pool n =
  if n > 20 then begin
    let a = T.async pool (fun _ -> fib_par pool (n-1)) in
    let b = T.async pool (fun _ -> fib_par pool (n-2)) in
    T.await pool a + T.await pool b
  end else fib n

let main () =
  let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
  let res = T.run pool (fun _ -> fib_par pool n) in
  T.teardown_pool pool;
  Printf.printf "fib(%d) = %d\n" n res

let _ = main ()

该程序以域数量和斐波那契函数的输入作为第一个和第二个命令行参数。

让我们从主函数开始。首先,我们在嵌套并行任务将运行的池中设置一组域。调用 run 函数的域也将参与执行提交到池的任务。我们在 run 函数中调用并行斐波那契函数 fib_par。最后,我们拆除池并打印结果。

对于足够大的输入(n > 20), fib_par 函数使用 async 函数在池中异步生成左右递归调用。 async 函数返回一个对结果的承诺。使用 await 函数等待承诺可以获得异步计算的结果。 await 函数调用在承诺得到解决之前一直阻塞。

对于较小的输入, fib_par 函数简单地调用顺序斐波那契函数 fib。对于较小的问题大小,切换到顺序模式很重要。否则,并行化的成本将超过可用的工作量。

为了简单起见,我们使用 ocamlfind 来编译此程序。建议用户使用 dune 来构建使用通过 opam 安装的库的程序。

$ ocamlfind ocamlopt -package domainslib -linkpkg -o fib_par2.exe fib_par2.ml
$ ./fib_par2.exe 1 42
fib(42) = 433494437
$ hyperfine './fib.exe 42' './fib_par2.exe 2 42' \
            './fib_par2.exe 4 42' './fib_par2.exe 8 42'
Benchmark 1: ./fib.exe 42
  Time (mean ± sd):     1.217 s ±  0.018 s    [User: 1.203 s, System: 0.004 s]
  Range (min … max):    1.202 s …  1.261 s    10 runs

Benchmark 2: ./fib_par2.exe 2 42
  Time (mean ± sd):    628.2 ms ±   2.9 ms    [User: 1243.1 ms, System: 4.9 ms]
  Range (min … max):   625.7 ms … 634.5 ms    10 runs

Benchmark 3: ./fib_par2.exe 4 42
  Time (mean ± sd):    337.6 ms ±  23.4 ms    [User: 1321.8 ms, System: 8.4 ms]
  Range (min … max):   318.5 ms … 377.6 ms    10 runs

Benchmark 4: ./fib_par2.exe 8 42
  Time (mean ± sd):    250.0 ms ±   9.4 ms    [User: 1877.1 ms, System: 12.6 ms]
  Range (min … max):   242.5 ms … 277.3 ms    11 runs

Summary
  './fib_par2.exe 8 42' ran
    1.35 ± 0.11 times faster than './fib_par2.exe 4 42'
    2.51 ± 0.10 times faster than './fib_par2.exe 2 42'
    4.87 ± 0.20 times faster than './fib.exe 42'

结果表明,使用 8 个域,并行斐波那契程序的运行速度比顺序版本快 4.87 倍。

2.2 并行迭代结构

许多数值算法使用 for 循环。parallel-for 原语提供了一种直接的方法来并行化此类代码。让我们以 谱范数 为例,这是来自计算机语言基准测试游戏的基准测试,并将其并行化。程序的顺序版本如下所示。

(* spectralnorm.ml *) let n = try int_of_string Sys.argv.(1) with _ -> 32 let eval_A i j = 1. /. float((i+j)*(i+j+1)/2+i+1) let eval_A_times_u u v = let n = Array.length v - 1 in for i = 0 to n do let vi = ref 0. in for j = 0 to n do vi := !vi +. eval_A i j *. u.(j) done; v.(i) <- !vi done let eval_At_times_u u v = let n = Array.length v - 1 in for i = 0 to n do let vi = ref 0. in for j = 0 to n do vi := !vi +. eval_A j i *. u.(j) done; v.(i) <- !vi done let eval_AtA_times_u u v = let w = Array.make (Array.length u) 0.0 in eval_A_times_u u w; eval_At_times_u w v let () = let u = Array.make n 1.0 and v = Array.make n 0.0 in for _i = 0 to 9 do eval_AtA_times_u u v; eval_AtA_times_u v u done; let vv = ref 0.0 and vBv = ref 0.0 in for i=0 to n-1 do vv := !vv +. v.(i) *. v.(i); vBv := !vBv +. u.(i) *. v.(i) done; Printf.printf "%0.9f\n" (sqrt(!vBv /. !vv))

观察到该程序在 eval_A_times_ueval_At_times_u 中嵌套了循环。外循环体中的每次迭代都会从 u 读取数据,但会写入 v 中的互斥内存位置。因此,外循环的迭代彼此不依赖,可以并行执行。

谱范数的并行版本如下所示。

(* spectralnorm_par.ml *)
let num_domains = try int_of_string Sys.argv.(1) with _ -> 1
let n = try int_of_string Sys.argv.(2) with _ -> 32

let eval_A i j = 1. /. float((i+j)*(i+j+1)/2+i+1)

module T = Domainslib.Task

let eval_A_times_u pool u v =
  let n = Array.length v - 1 in
  T.parallel_for pool ~start:0 ~finish:n ~body:(fun i ->
    let vi = ref 0. in
    for j = 0 to n do vi := !vi +. eval_A i j *. u.(j) done;
    v.(i) <- !vi
  )

let eval_At_times_u pool u v =
  let n = Array.length v - 1 in
  T.parallel_for pool ~start:0 ~finish:n ~body:(fun i ->
    let vi = ref 0. in
    for j = 0 to n do vi := !vi +. eval_A j i *. u.(j) done;
    v.(i) <- !vi
  )

let eval_AtA_times_u pool u v =
  let w = Array.make (Array.length u) 0.0 in
  eval_A_times_u pool u w; eval_At_times_u pool w v

let () =
  let pool = T.setup_pool ~num_domains:(num_domains - 1) () in
  let u = Array.make n 1.0  and  v = Array.make n 0.0 in
  T.run pool (fun _ ->
  for _i = 0 to 9 do
    eval_AtA_times_u pool u v; eval_AtA_times_u pool v u
  done);

  let vv = ref 0.0  and  vBv = ref 0.0 in
  for i=0 to n-1 do
    vv := !vv +. v.(i) *. v.(i);
    vBv := !vBv +. u.(i) *. v.(i)
  done;
  T.teardown_pool pool;
  Printf.printf "%0.9f\n" (sqrt(!vBv /. !vv))

观察到 parallel_for 函数与顺序版本中的 for 循环同构。除了设置和拆除池的样板代码外,不需要进行其他更改。

$ ocamlopt -o spectralnorm.exe spectralnorm.ml
$ ocamlfind ocamlopt -package domainslib -linkpkg -o spectralnorm_par.exe \
  spectralnorm_par.ml
$ hyperfine './spectralnorm.exe 4096' './spectralnorm_par.exe 2 4096' \
            './spectralnorm_par.exe 4 4096' './spectralnorm_par.exe 8 4096'
Benchmark 1: ./spectralnorm.exe 4096
  Time (mean ± sd):     1.989 s ±  0.013 s    [User: 1.972 s, System: 0.007 s]
  Range (min … max):    1.975 s …  2.018 s    10 runs

Benchmark 2: ./spectralnorm_par.exe 2 4096
  Time (mean ± sd):     1.083 s ±  0.015 s    [User: 2.140 s, System: 0.009 s]
  Range (min … max):    1.064 s …  1.102 s    10 runs

Benchmark 3: ./spectralnorm_par.exe 4 4096
  Time (mean ± sd):    698.7 ms ±  10.3 ms    [User: 2730.8 ms, System: 18.3 ms]
  Range (min … max):   680.9 ms … 721.7 ms    10 runs

Benchmark 4: ./spectralnorm_par.exe 8 4096
  Time (mean ± sd):    921.8 ms ±  52.1 ms    [User: 6711.6 ms, System: 51.0 ms]
  Range (min … max):   838.6 ms … 989.2 ms    10 runs

Summary
  './spectralnorm_par.exe 4 4096' ran
    1.32 ± 0.08 times faster than './spectralnorm_par.exe 8 4096'
    1.55 ± 0.03 times faster than './spectralnorm_par.exe 2 4096'
    2.85 ± 0.05 times faster than './spectralnorm.exe 4096'

在作者的机器上,该程序在高达 4 个域的情况下可以很好地扩展,但在 8 个域的情况下性能会变差。回想一下,该机器只有 4 个物理内核。调试和解决此性能问题超出了本教程的范围。

3 并行垃圾收集

并行 OCaml 程序可扩展性的一个重要方面是垃圾收集器 (GC) 的可扩展性。OCaml GC 旨在具有低延迟和良好的并行可扩展性。OCaml 具有一个分代垃圾收集器,它有一个小的次要堆和一个大的主要堆。新对象(最大到一定大小)将在次要堆中分配。每个域都有自己的域本地次要堆区域,新对象将在其中分配,无需与其他域同步。当一个域耗尽其次要堆区域时,它会调用对次要堆进行的停止世界收集。在停止世界的部分,所有域都会并行收集其次要堆区域,将幸存者迁移到主要堆。

对于主要堆,每个域都维护着域本地的、大小分段的内存池,大型对象和次要收集的幸存者将在其中分配。拥有域本地池可以避免大多数主要堆分配的同步。主要堆通过并发标记-清除算法进行收集,该算法涉及每次主要循环的几次短暂的停止世界暂停。

总的来说,用户应该期望垃圾收集器随着域数量的增加而很好地扩展,同时保持低延迟。有关垃圾收集器设计和评估的更多信息,请查看有关 将并行性改造到 OCaml 的 ICFP 2020 论文。

4 内存模型:简单部分

现代处理器和编译器积极地优化程序。这些优化在不影响顺序程序的情况下加速执行,但会导致并行程序中出现意外的行为。为了从这些优化中获益,OCaml 采用了一个 松弛内存模型,它精确地指定了程序可能会观察到的哪些 松弛行为。虽然这些模型很难直接针对它们进行编程,但 OCaml 内存模型提供了保留顺序推理简单性的配方。

首先,不可变值可以自由地在多个域之间共享,并且可以并行访问。对于可变数据结构(例如引用单元、数组和可变记录字段),程序员应避免 数据竞争。引用单元、数组和可变记录字段被称为 非原子 数据结构。当两个域并发访问非原子内存位置且没有 同步 并且至少一次访问是写入操作时,就会发生数据竞争。OCaml 提供了许多方法来引入同步,包括原子变量(第 9.7 节)和互斥锁(第 9.5 节)。

重要的是,对于无数据竞争 (DRF) 程序,OCaml 提供顺序一致 (SC) 语义 - 此类程序的观察行为可以用来自不同域的操作的交错来解释。此属性被称为 DRF-SC 保证。此外,在 OCaml 中,DRF-SC 保证是模块化的 - 如果程序的一部分没有数据竞争,那么 OCaml 内存模型将确保这些部分具有顺序一致性,即使程序的其他部分存在数据竞争。即使对于存在数据竞争的程序,OCaml 也提供强有力的保证。虽然用户可能会观察到非顺序一致的行为,但不会发生崩溃。

有关在存在数据竞争的情况下松弛行为的更多详细信息,请参阅有关内存模型难题的章节(第 10 章)。

5 阻塞同步

域可以通过 MutexConditionSemaphore 模块进行阻塞同步。这些模块与用于同步由线程库创建的线程(第 34 章)的模块相同。为了清楚起见,在本节的其余部分,我们将由线程库创建的线程称为 系统线程。以下程序使用互斥锁和条件变量实现了并发堆栈。

module Blocking_stack : sig type 'a t val make : unit -> 'a t val push : 'a t -> 'a -> unit val pop : 'a t -> 'a end = struct type 'a t = { mutable contents: 'a list; mutex : Mutex.t; nonempty : Condition.t } let make () = { contents = []; mutex = Mutex.create (); nonempty = Condition.create () } let push r v = Mutex.lock r.mutex; r.contents <- v::r.contents; Condition.signal r.nonempty; Mutex.unlock r.mutex let pop r = Mutex.lock r.mutex; let rec loop () = match r.contents with | [] -> Condition.wait r.nonempty r.mutex; loop () | x::xs -> r.contents <- xs; x in let res = loop () in Mutex.unlock r.mutex; res end

并发堆栈是使用包含三个字段的记录实现的:一个可变字段 contents,用于存储堆栈中的元素;一个 mutex,用于控制对 contents 字段的访问;一个条件变量 nonempty,用于向等待堆栈变为非空的阻塞域发出信号。

push 操作会锁定互斥锁,并使用一个新的列表更新 contents 字段,该列表的头是正在推入的元素,尾部是旧列表。在持有锁的情况下发出条件变量 nonempty 的信号,以便唤醒任何正在此条件上等待的域。如果存在等待的域,则会唤醒其中一个域。如果不存在,则 signal 操作不会有任何影响。

pop 操作会锁定互斥锁并检查堆栈是否为空。如果是,则调用域将在条件变量 nonempty 上等待,使用 wait 原语。 wait 调用会原子地挂起当前域的执行并解锁 mutex。当此域再次被唤醒(当 wait 调用返回时),它会持有对 mutex 的锁。该域会尝试再次读取堆栈的内容。如果 pop 操作发现堆栈不为空,则它会将 contents 更新为旧列表的尾部,并返回头部。

使用 mutex 来控制对共享资源 contents 的访问,在使用堆栈的多个域之间引入了足够的同步。因此,当多个域并行使用堆栈时,不会出现数据竞争。

5.1 与系统线程的交互

系统线程如何与域交互?在特定域上创建的系统线程将保持固定在该域上。一次只允许一个系统线程在特定域上运行 OCaml 代码。但是,属于特定域的系统线程可以并行运行 C 库或系统代码。属于不同域的系统线程可以并行执行。

当使用 systhreads 时,为执行传递给 Domain.spawn 的计算而创建的线程也被视为 systhread。例如,以下程序总共创建了两个域(包括初始域),每个域有两个 systhreads(包括每个域的初始 systhread)。

(* dom_thr.ml *)
let m = Mutex.create ()
let r = ref None (* protected by m *)

let task () =
  let my_thr_id = Thread.(id (self ())) in
  let my_dom_id :> int = Domain.self () in
  Mutex.lock m;
  begin match !r with
  | None ->
      Printf.printf "Thread %d running on domain %d saw initial write\n%!"
        my_thr_id my_dom_id
  | Some their_thr_id ->
      Printf.printf "Thread %d running on domain %d saw the write by thread %d\n%!"
        my_thr_id my_dom_id their_thr_id;
  end;
  r := Some my_thr_id;
  Mutex.unlock m

let task' () =
  let t = Thread.create task () in
  task ();
  Thread.join t

let main () =
  let d = Domain.spawn task' in
  task' ();
  Domain.join d

let _ = main ()
$ ocamlopt -I +threads unix.cmxa threads.cmxa -o dom_thr.exe dom_thr.ml
$ ./dom_thr.exe
Thread 1 running on domain 1 saw initial write
Thread 0 running on domain 0 saw the write by thread 1
Thread 2 running on domain 1 saw the write by thread 0
Thread 3 running on domain 0 saw the write by thread 2

该程序使用由互斥量保护的共享引用单元来在运行在两个不同域上的不同 systhreads 之间进行通信。systhread 标识符在程序中唯一标识 systhreads。初始域获得域 ID 和线程 ID 为 0。新生成的域获得域 ID 为 1。

6 与 C 绑定交互

在使用多个域进行并行执行期间,运行在某个域上的 C 代码可能与运行在其他域上的任何 C 代码并行执行,即使它们都没有释放“域锁”。在 OCaml 5.0 之前,C 绑定可能假设如果未释放 OCaml 运行时锁,那么操作全局 C 状态(例如初始化函数局部静态值)将是安全的。在使用多个域进行并行执行的情况下,这不再成立。

7 原子操作

互斥量、条件变量和信号量用于实现域之间的阻塞同步。对于非阻塞同步,OCaml 提供了 Atomic 变量。顾名思义,非阻塞同步不提供用于挂起和唤醒域的机制。另一方面,用于非阻塞同步的原语通常编译为硬件提供的原子读-修改-写原语。例如,以下程序并行递增一个非原子计数器和一个原子计数器。

(* incr.ml *) let twice_in_parallel f = let d1 = Domain.spawn f in let d2 = Domain.spawn f in Domain.join d1; Domain.join d2 let plain_ref n = let r = ref 0 in let f () = for _i=1 to n do incr r done in twice_in_parallel f; Printf.printf "Non-atomic ref count: %d\n" !r let atomic_ref n = let r = Atomic.make 0 in let f () = for _i=1 to n do Atomic.incr r done in twice_in_parallel f; Printf.printf "Atomic ref count: %d\n" (Atomic.get r) let main () = let n = try int_of_string Sys.argv.(1) with _ -> 1 in plain_ref n; atomic_ref n let _ = main ()
$ ocamlopt -o incr.exe incr.ml
$ ./incr.exe 1_000_000
Non-atomic ref count: 1187193
Atomic ref count: 2000000

观察到使用非原子计数器得到的结果低于人们天真地预期的结果。这是因为非原子 incr 函数等效于

let incr r = let curr = !r in r := curr + 1

观察到加载和存储是两个独立的操作,并且增量操作作为一个整体不是原子执行的。当两个域并行执行此代码时,它们都可能读取计数器的相同值 curr 并将其更新为 curr + 1。因此,结果将是单个增量,而不是两个增量。另一方面,原子计数器在硬件对原子性的支持下,以原子方式执行加载和存储。原子计数器返回预期的结果。

原子变量可用于域之间的低级同步。以下示例使用原子变量在两个域之间交换消息。

let r = Atomic.make None let sender () = Atomic.set r (Some "Hello") let rec receiver () = match Atomic.get r with | None -> Domain.cpu_relax (); receiver () | Some m -> print_endline m let main () = let s = Domain.spawn sender in let d = Domain.spawn receiver in Domain.join s; Domain.join d let _ = main ()
Hello val r : string option Atomic.t = <abstr> val sender : unit -> unit = <fun> val receiver : unit -> unit = <fun> val main : unit -> unit = <fun>

虽然发送方和接收方竞争访问 r,但这并不是数据竞争,因为 r 是一个原子引用。

7.1 无锁栈

Atomic 模块用于实现非阻塞、无锁数据结构。以下程序实现了一个无锁栈。

module Lockfree_stack : sig type 'a t val make : unit -> 'a t val push : 'a t -> 'a -> unit val pop : 'a t -> 'a option end = struct type 'a t = 'a list Atomic.t let make () = Atomic.make [] let rec push r v = let s = Atomic.get r in if Atomic.compare_and_set r s (v::s) then () else (Domain.cpu_relax (); push r v) let rec pop r = let s = Atomic.get r in match s with | [] -> None | x::xs -> if Atomic.compare_and_set r s xs then Some x else (Domain.cpu_relax (); pop r) end

原子栈由一个原子引用表示,该引用保存一个列表。 pushpop 操作使用 compare_and_set 原语尝试原子地更新原子引用。表达式 compare_and_set r seen vr 的值设置为 v 当且仅当其当前值与 seen 相同。重要的是,比较和更新原子地发生。如果比较成功(并且更新完成),则表达式计算结果为 true,否则计算结果为 false

如果 compare_and_set 失败,则其他一些域也尝试同时更新原子引用。在这种情况下,pushpop 操作调用 Domain.cpu_relax 以短暂地退避,允许竞争域在重试失败的操作之前取得进展。这种无锁栈实现也称为 Treiber 栈。