模块 Atomic

module Atomic: sig .. end

原子引用。

请参阅以下 示例。另请参阅手册中的“内存模型:难题”章节。


type !'a t 

对类型为 'a 的值的原子(可变)引用。

val make : 'a -> 'a t

创建原子引用。

val make_contended : 'a -> 'a t

创建一个原子引用,它在缓存行中单独存在。它占用的内存是使用 make v 分配的内存的 4-16 倍。

主要目的是防止虚假共享和由此带来的性能下降。当 CPU 执行原子操作时,它会暂时获取包含原子引用的整个缓存行的所有权。如果多个原子引用共享相同的缓存行,则同时修改这些分离的内存区域变得不可能,这会导致瓶颈。因此,作为一般准则,如果原子引用正在经历争用,则为其分配自己的缓存行可能会提高性能。

val get : 'a t -> 'a

获取原子引用的当前值。

val set : 'a t -> 'a -> unit

为原子引用设置新值。

val exchange : 'a t -> 'a -> 'a

为原子引用设置新值,并返回当前值。

val compare_and_set : 'a t -> 'a -> 'a -> bool

compare_and_set r seen v 仅当 r 的当前值与 seen 物理相等时,才会将 r 的新值设置为 v - 比较和设置原子地发生。如果比较成功(因此设置发生),则返回 true,否则返回 false

val fetch_and_add : int t -> int -> int

fetch_and_add r n 原子地将 r 的值增加 n,并返回当前值(在增量之前)。

val incr : int t -> unit

incr r 原子地将 r 的值增加 1

val decr : int t -> unit

decr r 原子地将 r 的值减少 1

示例

基本线程协调

一个基本用例是拥有以线程安全的方式更新的全局计数器,例如在程序执行的 I/O 上保留某些类型的指标。另一个基本用例是协调给定程序中线程的终止,例如当一个线程找到答案时,或者当用户关闭程序时。

例如,在这里,我们将尝试找到一个其哈希满足基本属性的数字。为此,我们将运行多个线程,这些线程将尝试随机数字,直到找到一个有效的数字。

当然,下面的输出是一个示例运行,并且每次运行程序时都会发生变化。

    (* use for termination *)
    let stop_all_threads = Atomic.make false

    (* total number of individual attempts to find a number *)
    let num_attempts = Atomic.make 0

    (* find a number that satisfies [p], by... trying random numbers
       until one fits. *)
    let find_number_where (p:int -> bool) =
      let rand = Random.State.make_self_init() in
      while not (Atomic.get stop_all_threads) do

        let n = Random.State.full_int rand max_int in
        ignore (Atomic.fetch_and_add num_attempts 1 : int);

        if p (Hashtbl.hash n) then (
          Printf.printf "found %d (hash=%d)\n%!" n (Hashtbl.hash n);
          Atomic.set stop_all_threads true; (* signal all threads to stop *)
        )
      done;;


    (* run multiple domains to search for a [n] where [hash n <= 100] *)
    let () =
      let criterion n = n <= 100 in
      let threads =
        Array.init 8
          (fun _ -> Domain.spawn (fun () -> find_number_where criterion))
      in
      Array.iter Domain.join threads;
      Printf.printf "total number of attempts: %d\n%!"
        (Atomic.get num_attempts) ;;

    - : unit = ()
    found 1651745641680046833 (hash=33)
    total number of attempts: 30230350
    

Treiber 栈

另一个示例是基本的 Treiber 栈(一个线程安全的栈),可以在线程之间安全共享。

请注意,pushpop 都是递归的,因为它们尝试将新的栈(多一个或少一个元素)与旧的栈交换。这是乐观并发:例如,push stack x 的每次迭代都会获取旧的栈 l,并希望在它尝试用 x::l 替换 l 之前,没有其他人有时间修改列表。如果 compare_and_set 失败,这意味着我们过于乐观,必须重试。

    type 'a stack = 'a list Atomic.t

    let rec push (stack: _ stack) elt : unit =
      let cur = Atomic.get stack in
      let success = Atomic.compare_and_set stack cur (elt :: cur) in
      if not success then
        push stack elt

    let rec pop (stack: _ stack) : _ option =
      let cur = Atomic.get stack in
      match cur with
      | [] -> None
      | x :: tail ->
        let success = Atomic.compare_and_set stack cur tail in
        if success then Some x
        else pop stack

    # let st = Atomic.make []
    # push st 1
    - : unit = ()
    # push st 2
    - : unit = ()
    # pop st
    - : int option = Some 2
    # pop st
    - : int option = Some 1
    # pop st
    - : int option = None