module Atomic:sig
..end
原子引用。
请参阅以下 示例。另请参阅手册中的“内存模型:难题”章节。
type !'a
t
对类型为 'a
的值的原子(可变)引用。
val make : 'a -> 'a t
创建原子引用。
val make_contended : 'a -> 'a t
创建一个原子引用,它在缓存行中单独存在。它占用的内存是使用 make v
分配的内存的 4-16 倍。
主要目的是防止虚假共享和由此带来的性能下降。当 CPU 执行原子操作时,它会暂时获取包含原子引用的整个缓存行的所有权。如果多个原子引用共享相同的缓存行,则同时修改这些分离的内存区域变得不可能,这会导致瓶颈。因此,作为一般准则,如果原子引用正在经历争用,则为其分配自己的缓存行可能会提高性能。
val get : 'a t -> 'a
获取原子引用的当前值。
val set : 'a t -> 'a -> unit
为原子引用设置新值。
val exchange : 'a t -> 'a -> 'a
为原子引用设置新值,并返回当前值。
val compare_and_set : 'a t -> 'a -> 'a -> bool
compare_and_set r seen v
仅当 r
的当前值与 seen
物理相等时,才会将 r
的新值设置为 v
- 比较和设置原子地发生。如果比较成功(因此设置发生),则返回 true
,否则返回 false
。
val fetch_and_add : int t -> int -> int
fetch_and_add r n
原子地将 r
的值增加 n
,并返回当前值(在增量之前)。
val incr : int t -> unit
incr r
原子地将 r
的值增加 1
。
val decr : int t -> unit
decr r
原子地将 r
的值减少 1
。
一个基本用例是拥有以线程安全的方式更新的全局计数器,例如在程序执行的 I/O 上保留某些类型的指标。另一个基本用例是协调给定程序中线程的终止,例如当一个线程找到答案时,或者当用户关闭程序时。
例如,在这里,我们将尝试找到一个其哈希满足基本属性的数字。为此,我们将运行多个线程,这些线程将尝试随机数字,直到找到一个有效的数字。
当然,下面的输出是一个示例运行,并且每次运行程序时都会发生变化。
(* use for termination *)
let stop_all_threads = Atomic.make false
(* total number of individual attempts to find a number *)
let num_attempts = Atomic.make 0
(* find a number that satisfies [p], by... trying random numbers
until one fits. *)
let find_number_where (p:int -> bool) =
let rand = Random.State.make_self_init() in
while not (Atomic.get stop_all_threads) do
let n = Random.State.full_int rand max_int in
ignore (Atomic.fetch_and_add num_attempts 1 : int);
if p (Hashtbl.hash n) then (
Printf.printf "found %d (hash=%d)\n%!" n (Hashtbl.hash n);
Atomic.set stop_all_threads true; (* signal all threads to stop *)
)
done;;
(* run multiple domains to search for a [n] where [hash n <= 100] *)
let () =
let criterion n = n <= 100 in
let threads =
Array.init 8
(fun _ -> Domain.spawn (fun () -> find_number_where criterion))
in
Array.iter Domain.join threads;
Printf.printf "total number of attempts: %d\n%!"
(Atomic.get num_attempts) ;;
- : unit = ()
found 1651745641680046833 (hash=33)
total number of attempts: 30230350
另一个示例是基本的 Treiber 栈(一个线程安全的栈),可以在线程之间安全共享。
请注意,push
和 pop
都是递归的,因为它们尝试将新的栈(多一个或少一个元素)与旧的栈交换。这是乐观并发:例如,push stack x
的每次迭代都会获取旧的栈 l
,并希望在它尝试用 x::l
替换 l
之前,没有其他人有时间修改列表。如果 compare_and_set
失败,这意味着我们过于乐观,必须重试。
type 'a stack = 'a list Atomic.t
let rec push (stack: _ stack) elt : unit =
let cur = Atomic.get stack in
let success = Atomic.compare_and_set stack cur (elt :: cur) in
if not success then
push stack elt
let rec pop (stack: _ stack) : _ option =
let cur = Atomic.get stack in
match cur with
| [] -> None
| x :: tail ->
let success = Atomic.compare_and_set stack cur tail in
if success then Some x
else pop stack
# let st = Atomic.make []
# push st 1
- : unit = ()
# push st 2
- : unit = ()
# pop st
- : int option = Some 2
# pop st
- : int option = Some 1
# pop st
- : int option = None