module Atomic: Atomic
type !'a
t
对类型 'a
的值的原子(可变)引用。
val make : 'a -> 'a t
创建原子引用。
val make_contended : 'a -> 'a t
创建一个在缓存行上单独存在的原子引用。它占用分配给 make v
的内存的 4-16 倍。
主要目的是防止错误共享和由此导致的性能下降。当 CPU 执行原子操作时,它会暂时拥有包含原子引用的整个缓存行。如果多个原子引用共享同一个缓存行,则同时修改这些不相关的内存区域变得不可能,这可能会造成瓶颈。因此,作为一般准则,如果原子引用正在经历竞争,将其分配到自己的缓存行可能会提高性能。
val get : 'a t -> 'a
获取原子引用的当前值。
val set : 'a t -> 'a -> unit
为原子引用设置新值。
val exchange : 'a t -> 'a -> 'a
为原子引用设置新值,并返回当前值。
val compare_and_set : 'a t -> 'a -> 'a -> bool
compare_and_set r seen v
仅当 r
的当前值与 seen
相同(物理上相等)时,才将 r
的新值设置为 v
-- 比较和设置操作是原子性的。如果比较成功(因此设置成功)则返回 true
,否则返回 false
。
val fetch_and_add : int t -> int -> int
fetch_and_add r n
原子地将 r
的值增加 n
,并返回当前值(增加之前的值)。
val incr : int t -> unit
incr r
原子地将 r
的值增加 1
。
val decr : int t -> unit
decr r
原子地将 r
的值减少 1
。
一个基本的用例是拥有以线程安全的方式更新的全局计数器,例如跟踪程序执行的 I/O 的一些指标。另一个基本的用例是协调给定程序中线程的终止,例如当一个线程找到答案时,或者当程序由用户关闭时。
例如,在这里,我们将尝试找到一个散列满足基本属性的数字。为此,我们将运行多个线程,这些线程将尝试随机数字,直到找到一个有效的数字。
当然,下面的输出只是一个示例运行,并且每次运行程序时都会改变。
(* use for termination *)
let stop_all_threads = Atomic.make false
(* total number of individual attempts to find a number *)
let num_attempts = Atomic.make 0
(* find a number that satisfies [p], by... trying random numbers
until one fits. *)
let find_number_where (p:int -> bool) =
let rand = Random.State.make_self_init() in
while not (Atomic.get stop_all_threads) do
let n = Random.State.full_int rand max_int in
ignore (Atomic.fetch_and_add num_attempts 1 : int);
if p (Hashtbl.hash n) then (
Printf.printf "found %d (hash=%d)\n%!" n (Hashtbl.hash n);
Atomic.set stop_all_threads true; (* signal all threads to stop *)
)
done;;
(* run multiple domains to search for a [n] where [hash n <= 100] *)
let () =
let criterion n = n <= 100 in
let threads =
Array.init 8
(fun _ -> Domain.spawn (fun () -> find_number_where criterion))
in
Array.iter Domain.join threads;
Printf.printf "total number of attempts: %d\n%!"
(Atomic.get num_attempts) ;;
- : unit = ()
found 1651745641680046833 (hash=33)
total number of attempts: 30230350
另一个例子是基本的 Treiber 栈(一个线程安全的栈),它可以安全地在线程之间共享。
注意 push
和 pop
都是递归的,因为它们试图用新栈(多一个或少一个元素)替换旧栈。这是乐观并发:例如,push stack x
的每次迭代都会获取旧栈 l
,并希望在它尝试用 x::l
替换 l
之前,没有其他人有时间修改列表。如果 compare_and_set
失败,这意味着我们过于乐观,必须重试。
type 'a stack = 'a list Atomic.t
let rec push (stack: _ stack) elt : unit =
let cur = Atomic.get stack in
let success = Atomic.compare_and_set stack cur (elt :: cur) in
if not success then
push stack elt
let rec pop (stack: _ stack) : _ option =
let cur = Atomic.get stack in
match cur with
| [] -> None
| x :: tail ->
let success = Atomic.compare_and_set stack cur tail in
if success then Some x
else pop stack
# let st = Atomic.make []
# push st 1
- : unit = ()
# push st 2
- : unit = ()
# pop st
- : int option = Some 2
# pop st
- : int option = Some 1
# pop st
- : int option = None