第 12 章 语言扩展

24 效果处理器

(在 5.0 中引入)

注意:OCaml 5.0 中的效果处理器应被视为实验性的。效果处理器在标准库的 Effect 模块中作为其在运行时实现的薄包装器公开。它们不受支持作为具有新语法的语言特性。您可以依靠它们来构建非本地控制流抽象,例如不向用户公开效果处理器原语的用户级线程。将来可能会发生重大更改。

效果处理器是一种用于使用用户定义效果进行模块化编程的机制。效果处理器允许程序员描述执行有效果操作计算,其含义由处理器描述,这些处理器包含计算。效果处理器是异常处理器的泛化,并使诸如可恢复异常、轻量级线程、协程、生成器和异步 I/O 等非本地控制流机制能够以可组合的方式表达。在本教程中,我们将了解如何使用效果处理器构建其中一些机制。

24.1 基础

为了理解基础知识,让我们定义一个效果(即一个操作),它接受一个整数参数并返回一个整数结果。我们将此效果命名为Xchg

open Effect open Effect.Deep type _ Effect.t += Xchg: int -> int t let comp1 () = perform (Xchg 0) + perform (Xchg 1)

我们通过使用新的构造函数Xchg: int -> int t扩展预定义的可扩展变体类型Effect.t来声明交换效果Xchg。该声明可以直观地理解为“Xchg效果采用整数参数,并且当执行此效果时,它会返回一个整数”。计算comp1使用perform原语两次执行该效果并返回它们的和。

我们可以通过实现一个始终返回提供的值的后续者的处理器来处理Xchg效果

try_with comp1 () { effc = fun (type a) (eff: a t) -> match eff with | Xchg n -> Some (fun (k: (a, _) continuation) -> continue k (n+1)) | _ -> None }
- : int = 3

try_with在处理Xchg效果的效果处理器下运行计算comp1 ()。如前所述,效果处理器是异常处理器的泛化。类似于异常处理器,当计算执行Xchg效果时,控制权会跳转到相应的处理器。但是,与异常处理器不同的是,处理器还提供分隔的延续k,它表示从perform点到此处理器的挂起计算。

处理器使用continue原语使用提供的值的后续者恢复挂起的计算。在此示例中,计算comp1执行Xchg 0Xchg 1,并分别从处理器接收值12。因此,整个表达式计算结果为3

需要注意的是,我们必须在效果处理器中使用局部抽象类型(type a)。类型Effect.t是 GADT,并且不同效果的效果声明可能具有不同的类型参数。类型a Effect.t中的类型参数a表示执行效果时返回的值的类型。从eff具有类型a Effect.t的事实以及Xchg n具有类型int Effect.t的事实,类型检查器推断a必须是int,这就是为什么我们被允许将整数n+1作为参数传递给continue k

另一个需要注意的是,当处理效果时,“| _ -> None”这种全匹配情况是必要的。此情况可以直观地理解为“将未处理的效果转发到外部处理器”。

在此示例中,我们使用效果处理器的深层版本,而不是浅层版本。深层处理器监视计算,直到计算终止(正常或通过异常),并处理计算按顺序执行的所有效果。相反,浅层处理器监视计算,直到计算终止或计算执行一个效果,并且它仅处理此单个效果。在适用它们的情况下,通常首选深层处理器。稍后在 ‍12.24.6中讨论了一个利用浅层处理器的示例。

限制

OCaml 的效果是同步的:无法从信号处理程序、终结器、memprof 回调或 GC 警报异步执行效果,并从代码的主体捕获它。相反,这将导致Effect.Unhandled异常(12.24.5)。

同样,效果与使用从 C 到 OCaml 的回调不兼容(第 ‍22.7节)。效果无法跨越对caml_callback的调用,这将导致Effect.Unhandled异常。特别是,在混合使用从 C 到 OCaml 的回调的库和使用效果的库时必须小心。

24.2 并发

效果处理器的表达能力来自分隔的延续。虽然前面的示例立即恢复了计算,但计算可能会稍后恢复,在此期间运行其他一些计算。让我们扩展前面的示例,并使用Xchg效果实现两个并发计算之间的消息传递并发。我们将这些并发计算称为任务

任务要么处于挂起状态,要么已完成。我们如下表示任务状态

type 'a status = Complete of 'a | Suspended of {msg: int; cont: (int, 'a status) continuation}

任务要么已完成,结果类型为'a,要么已挂起,带要发送的消息msg和延续cont。类型(int,'a status) continuation表示挂起的计算期望一个int值来恢复,并在恢复时返回一个'a status值。

接下来,我们定义一个step函数,该函数执行计算的一步,直到它完成或挂起

let step (f : unit -> 'a) () : 'a status = match_with f () { retc = (fun v -> Complete v); exnc = raise; effc = fun (type a) (eff: a t) -> match eff with | Xchg msg -> Some (fun (cont: (a, _) continuation) -> Suspended {msg; cont}) | _ -> None }

step 函数的参数 f 是一个可以执行 Xchg 效果并返回类型为 'a 的结果的计算。 step 函数本身返回一个 'a status 值。

step 函数中,我们使用了 match_with 原语。类似于 try_withmatch_with 原语安装了一个效果处理器。但是,与仅提供效果情况 effctry_with 不同,match_with 期望值 (retc) 和异常 (exnc) 返回情况的处理器。实际上,try_with 可以使用 match_with 定义如下:let try_with f v {effc} = match_with f v {retc = Fun.id; exnc = raise; effc}

step 函数中,

由于 step 函数处理 Xchg 效果,因此 step f 是一个不执行 Xchg 效果的计算。但是,它可能会执行其他效果。此外,由于我们使用的是深层处理器,因此状态中存储的延续 cont 不会执行 Xchg 效果。

现在我们可以编写一个简单的调度程序,将一对任务运行到完成

let rec run_both a b = match a (), b () with | Complete va, Complete vb -> (va, vb) | Suspended {msg = m1; cont = k1}, Suspended {msg = m2; cont = k2} -> run_both (fun () -> continue k1 m2) (fun () -> continue k2 m1) | _ -> failwith "Improper synchronization"

这两个任务都可能运行到完成,或者都可能提供交换消息。在后一种情况下,每个计算都会接收另一个计算提供的价值。一个计算提供交换而另一个计算终止的情况被视为程序员错误,并导致处理器引发异常

现在我们可以定义第二个也交换两个消息的计算

let comp2 () = perform (Xchg 21) * perform (Xchg 21)

最后,我们可以将这两个计算一起运行

run_both (step comp1) (step comp2)
- : int * int = (42, 0)

计算 comp1 提供值 01,并交换接收值 2121,将其相加,产生 42。计算 comp2 提供值 2121,并交换接收值 01,将其相乘,产生 0。这两个计算之间的通信完全在 run_both 内部编程。实际上,仅 comp1comp2 的定义本身并没有为 Xchg 效果分配任何含义。

24.3 用户级线程

让我们扩展前面的示例以适用于任意数量的任务。许多语言(如 GHC Haskell 和 Go)提供用户级线程作为在运行时系统中实现的原始功能。使用效果处理器,用户级线程及其调度程序可以在 OCaml 本身中实现。通常,用户级线程系统提供一个 fork 原语来生成一个新的并发任务,以及一个 yield 原语来将控制权让给其他任务。相应地,我们将声明两个效果如下

type _ Effect.t += Fork : (unit -> unit) -> unit t | Yield : unit t

Fork 效果接受一个 thunk(一个挂起的计算,表示为类型为 unit -> unit 的函数)并向执行者返回一个单元。 Yield 效果是无参数的,并在执行时返回一个单元。让我们考虑一个执行 Xchg 效果的任务可能与任何其他也提供交换值的匹配任务。

我们还将定义一些辅助函数,这些函数仅执行这些效果

let fork f = perform (Fork f) let yield () = perform Yield let xchg v = perform (Xchg v)

顶级 run 函数定义了调度程序

(* 一个并发循环调度程序 *) let run (main : unit -> unit) : unit = let exchanger = ref None in (* 等待交换器 *) let run_q = Queue.create () in (* 调度程序队列 *) let enqueue k v = let task () = continue k v in Queue.push task run_q in let dequeue () = if Queue.is_empty run_q then () (* 完成 *) else begin let task = Queue.pop run_q in task () end in let rec spawn (f : unit -> unit) : unit = match_with f () { retc = dequeue; exnc = (fun e -> print_endline (Printexc.to_string e); dequeue ()); effc = fun (type a) (eff : a t) -> match eff with | Yield -> Some (fun (k : (a, unit) continuation) -> enqueue k (); dequeue ()) | Fork f -> Some (fun (k : (a, unit) continuation) -> enqueue k (); spawn f) | Xchg n -> Some (fun (k : (int, unit) continuation) -> begin match !exchanger with | Some (n', k') -> exchanger := None; enqueue k' n; continue k n' | None -> exchanger := Some (n, k); dequeue () end) | _ -> None } in spawn main

我们使用一个可变队列 run_q 来保存调度程序队列。FIFO 队列允许在调度程序中循环调度任务。enqueue 将任务插入队列,dequeue 从队列中提取任务并运行它们。引用单元 exchanger 保存一个(挂起的)提供交换值的的任务。在任何时候,要么有零个,要么有一个挂起的任务正在提供交换。

spawn 函数完成了繁重的工作。spawn 函数在效果处理器中运行给定的计算 f。如果 f 返回一个值(情况 retc),我们从调度程序队列中出队并运行下一个任务。如果计算 f 引发异常(情况 exnc),我们打印异常并从调度程序中运行下一个任务。

计算 f 也可能执行效果。如果 f 执行 Yield 效果,则当前任务被挂起(插入到就绪任务队列中),并且从调度程序队列中运行下一个任务。如果效果是 Fork f,则当前任务被挂起,并且新任务 f 通过对 spawn f 的尾递归立即执行。请注意,首先运行新任务的选择是任意的。我们也可以选择将 f 的任务插入就绪队列并立即恢复 k

如果效果是 Xchg,那么我们首先检查是否有任务在等待交换。如果有,我们用当前提供的价值将等待的任务入队,并立即用提供的价值恢复当前任务。如果没有,我们将当前任务设为等待交换器,并从调度程序队列中运行下一个任务。

请注意,此调度程序代码并不完美——它可能会泄漏资源。我们将在下一节 ‍12.24.3 中解释并修复此问题。

现在我们可以编写一个利用新定义的操作的并发程序

open Printf let _ = run (fun _ -> fork (fun _ -> printf "[t1] Sending 0\n"; let v = xchg 0 in printf "[t1] received %d\n" v); fork (fun _ -> printf "[t2] Sending 1\n"; let v = xchg 1 in printf "[t2] received %d\n" v))
[t1] Sending 0 [t2] Sending 1 [t2] received 0 [t1] received 1

观察到来自这两个任务的消息是交错的。还要注意,上面的代码片段没有引用效果处理器,并且是直接风格(没有单子操作)。此示例说明,使用效果处理器,并发程序中的用户代码可以保持简单的直接风格,并且效果处理器的使用可以完全包含在并发库实现中。

使用异常恢复

除了用值恢复延续之外,效果处理器还允许通过在执行时引发效果来恢复。这是在 discontinue 原语的帮助下完成的。discontinue 原语有助于确保即使存在效果,资源最终也会被释放。

例如,考虑前面示例中重新生成的出队操作

let dequeue () = if Queue.is_empty run_q then () (* 完成 *) else (Queue.pop run_q) ()

如果调度器队列为空,则 dequeue 认为调度器已完成并返回调用方。但是,可能仍然有一个任务正在等待交换值(存储在引用单元格 exchanger 中),这将永远阻塞!如果阻塞的任务持有资源,则这些资源将泄漏。例如,考虑以下任务

let leaky_task () = fork (fun _ -> let oc = open_out "secret.txt" in Fun.protect ~finally:(fun _ -> close_out oc) (fun _ -> output_value oc (xchg 0)))

该任务将接收到的消息写入文件 secret.txt。它使用 Fun.protect 确保输出通道 oc 在正常和异常返回情况下都被关闭。不幸的是,这还不够。如果交换效果 xchg 0 无法与其他某个线程执行的交换效果匹配,则此任务将永远阻塞。因此,输出通道 oc 永远不会关闭。

为了避免此问题,必须遵守一个简单的规则:每个延续最终必须继续或中断。在这里,我们使用 discontinue 来确保阻塞的任务不会永远阻塞。通过中断此任务,我们强制它终止(出现异常)

exception Improper_synchronization let dequeue () = if Queue.is_empty run_q then begin match !exchanger with | None -> () (* done *) | Some (n, k) -> exchanger := None; discontinue k Improper_synchronization end else (Queue.pop run_q) ()

当调度器队列为空且存在阻塞的交换器线程时,dequeue 函数使用 Improper_synchronization 异常中断阻塞的线程。此异常在阻塞的 xchg 函数调用处引发,这会导致运行 finally 块并关闭输出通道 oc。从用户的角度来看,似乎函数调用 xchg 0 引发了异常 Improper_synchronization

24.4 控制反转

在对数据结构进行遍历时,根据生产者或消费者是否控制遍历,有两种基本方法。例如,在 List.iter f l 中,生产者 List.iter 拥有控制权并将元素推送到消费者 f 进行处理。另一方面,Seq 模块提供了一种类似于延迟列表的机制,其中消费者控制遍历。例如,Seq.forever Random.bool 返回一个无限的随机位序列,其中每个位在消费者查询时(按需)产生。

当然,像 List.iter 这样的生产者更容易以前一种方式编写。后一种方式对消费者来说在人体工程学上更好,因为控制权在消费者手中是更可取和更自然的。为了兼得两者的优势,我们希望以前一种方式编写生产者并自动将其转换为后一种方式。由于效果处理程序,转换可以一次性编写为库函数。让我们将此函数命名为 invert。在查看其实现细节之前,我们将首先了解如何使用 invert 函数。此函数的类型如下所示

val invert : iter:(('a -> unit) -> unit) -> 'a Seq.t

invert 函数接受一个 iter 函数(一个将元素推送到消费者的生产者)并返回一个序列(其中消费者拥有控制权)。例如,

let lst_iter = Fun.flip List.iter [1;2;3]
val lst_iter : (int -> unit) -> unit = <fun>

是一个 iter 函数,类型为 (int -> unit) -> unit。表达式 lst_iter f 将元素 1、2 和 3 推送到消费者 f。例如,

lst_iter (fun i -> Printf.printf "%d\n" i)
1 2 3 - : unit = ()

表达式 invert lst_iter 返回一个序列,允许消费者按需遍历列表。例如,

let s = invert ~iter:lst_iter let next = Seq.to_dispenser s;;
val s : int Seq.t = <fun> val next : unit -> int option = <fun>
next();;
- : int option = Some 1
next();;
- : int option = Some 2
next();;
- : int option = Some 3
next();;
- : int option = None

我们可以在任何 iter 函数上使用相同的 invert 函数。例如,

let s = invert ~iter:(Fun.flip String.iter "OCaml") let next = Seq.to_dispenser s;;
val s : char Seq.t = <fun> val next : unit -> char option = <fun>
next();;
- : char option = Some 'O'
next();;
- : char option = Some 'C'
next();;
- : char option = Some 'a'
next();;
- : char option = Some 'm'
next();;
- : char option = Some 'l'
next();;
- : char option = None

实现控制反转

invert 函数的实现如下所示

let invert (type a) ~(iter : (a -> unit) -> unit) : a Seq.t = let module M = struct type _ Effect.t += Yield : a -> unit t end in let yield v = perform (M.Yield v) in fun () -> match_with iter yield { retc = (fun _ -> Seq.Nil); exnc = raise; effc = fun (type b) (eff : b Effect.t) -> match eff with | M.Yield v -> Some (fun (k: (b,_) continuation) -> Seq.Cons (v, continue k)) | _ -> None }

invert 函数声明了一个效果 Yield,它将要生成的元素作为参数。 yield 函数执行 Yield 效果。lambda 抽象 fun () -> ... 将所有操作延迟到请求序列的第一个元素之前。一旦发生这种情况,计算 iter yield 就会在效果处理程序下执行。每次 iter 函数将元素推送到 yield 函数时,计算都会被 Yield 效果中断。 Yield 效果通过向消费者返回值 Seq.Cons(v,continue k) 来处理。消费者获得元素 v 以及挂起的计算,在消费者看来,这只是序列的尾部。

当消费者从序列中请求下一个元素(通过将其应用于 ())时,延续 k 将恢复。这允许计算 iter yield 继续进行,直到它要么生成另一个元素,要么正常终止。在后一种情况下,将返回值 Seq.Nil,指示消费者迭代已结束。

需要注意的是,invert 函数返回的序列是短暂的(由 Seq 模块定义),即序列最多只能使用一次。此外,必须完全使用序列(即至少使用一次),以确保线性使用捕获的延续。

24.5 语义

在本节中,我们将借助示例了解效果处理程序的语义。

嵌套处理程序

与异常处理程序一样,效果处理程序可以嵌套。

type _ Effect.t += E : int t | F : string t let foo () = perform F let bar () = try_with foo () { effc = fun (type a) (eff: a t) -> match eff with | E -> Some (fun (k: (a,_) continuation) -> failwith "impossible") | _ -> None } let baz () = try_with bar () { effc = fun (type a) (eff: a t) -> match eff with | F -> Some (fun (k: (a,_) continuation) -> continue k "Hello, world!") | _ -> None }

在此示例中,计算 foo 执行 F,内部处理程序仅处理 E,外部处理程序处理 F。对 baz 的调用返回 Hello, world!

baz ()
- : string = "Hello, world!"

纤程(Fibers)

为了更好地理解效果处理程序的设计选择及其性能特征,了解效果处理程序的实现方式很有帮助。效果处理程序是借助于运行时管理的、动态增长的栈段来实现的,这些栈段称为纤程。OCaml中的程序栈是由这样的纤程组成的链表。

为评估由效果处理程序包含的计算而分配一个新的纤程。当计算正常返回给调用方(通过返回值或抛出异常)时,纤程被释放。

在前面示例中foo中的perform处,程序栈看起来像这样

+-----+ +-----+ +-----+ | | | | | | | baz |<--| bar |<--| foo | | | | | | | | | | | | | +-----+ +-----+ +-----+ <- stack_pointer

这两个链接对应于程序中的两个效果处理程序。当效果Fbaz中被处理时,程序状态如下所示

+-----+ +-----+ +-----+ | | | | | | +-+ | baz | | bar |<--| foo |<--|k| | | | | | | +-+ +-----+ <- stack_pointer +-----+ +-----+

定界延续k是堆上的一个对象,它引用对应于挂起计算的栈段。捕获延续不涉及复制栈帧。当延续被恢复时,通过将k指向的段链接到当前栈,栈被恢复到先前状态。由于延续捕获和恢复都不需要复制栈帧,因此使用perform挂起执行以及使用continuediscontinue恢复执行都很快。

未处理的效果

与Eff和Koka等语言不同,OCaml中的效果处理程序不提供效果安全性;编译器不会静态地确保程序执行的所有效果都被处理。如果效果没有匹配的处理程序,则在相应的perform处抛出Effect.Unhandled异常。例如,在前面的示例中,bar不处理效果F。因此,当我们运行bar时,会得到Effect.Unhandled F异常。

try bar () with Effect.Unhandled F -> "Saw Effect.Unhandled exception"
- : string = "Saw Effect.Unhandled exception"

线性延续

如前所述 ‍12.24.3,OCaml中的定界延续必须线性使用 – 每个捕获的延续必须恰好使用一个continuediscontinue恢复。尝试多次使用延续会引发Continuation_already_resumed异常。例如

try_with perform (Xchg 0) { effc = fun (type a) (eff : a t) -> match eff with | Xchg n -> Some (fun (k: (a, _) continuation) -> continue k 21 + continue k 21) | _ -> None }
Exception: Stdlib.Effect.Continuation_already_resumed.

向OCaml添加效果处理程序的主要动机是启用并发编程。单次延续足以满足几乎所有并发编程需求。与多次延续相比,它们的实现成本也低得多,因为它们不需要复制栈帧。此外,OCaml程序还可以操作线性资源,例如套接字和文件描述符。如果允许延续恢复多次,则很容易破坏线性规则。由于缺乏对线性的静态检查以及控制流的非局部性,因此调试资源上的此类线性违规将非常困难。因此,OCaml不支持多次延续。

虽然延续的“最多恢复一次”属性通过动态检查来确保,但没有检查来确保延续“至少恢复一次”。用户有责任确保捕获的延续至少恢复一次。不恢复延续会导致分配给纤程的内存以及挂起计算可能持有的任何资源泄漏。

可以在捕获的延续上安装一个终结器以确保释放资源

exception Unwind Gc.finalise (fun k -> try ignore (discontinue k Unwind) with _ -> ()) k

在这种情况下,如果k变得不可达,则终结器确保通过使用Unwind异常终止来展开延续栈,从而允许计算释放资源。但是,终结器的运行时成本远高于捕获延续的成本。因此,建议用户自己确保恰好恢复一次延续,而不是依赖终结器。

24.6 浅处理程序

到目前为止,我们看到的示例都使用了处理程序。深处理程序处理计算(按顺序)执行的所有效果。每当在深处理程序中捕获延续时,捕获的延续也包含处理程序。这意味着,当延续被恢复时,效果处理程序会自动重新安装,并将处理计算将来可能执行的效果。

OCaml还提供处理程序。与深处理程序相比,浅处理程序仅处理计算执行的第一个效果。在浅处理程序中捕获的延续不包含处理程序。这意味着,当延续被恢复时,处理程序不再存在。因此,当延续被恢复时,用户需要提供一个新的效果处理程序(可能是不同的处理程序)来处理计算可能执行的下一个效果。

浅处理程序使表达某些类型的程序变得更容易。让我们实现一个浅处理程序,它对计算强制执行特定的效果序列(协议)。对于此示例,让我们考虑计算可能执行以下效果

type _ Effect.t += Send : int -> unit Effect.t | Recv : int Effect.t

让我们假设我们希望强制执行一个协议,该协议仅允许符合正则表达式(Send;Recv)*;Send?SendRecv效果的交替序列。因此,效果序列[](空序列)、[Send][Send;Recv][Send;Recv;Send]等是允许的,但[Recv][Send;Send][Send;Recv;Recv]等是不允许的。这里的关键观察是处理的效果集会随着时间推移而演变。我们可以使用如下所示的浅处理程序很自然地强制执行此协议

open Effect.Shallow let run (comp: unit -> unit) : unit = let rec loop_send : type a. (a,unit) continuation -> a -> unit = fun k v -> continue_with k v { retc = Fun.id; exnc = raise; effc = fun (type b) (eff : b Effect.t) -> match eff with | Send n -> Some (fun (k: (b,_) continuation) -> loop_recv n k ()) | Recv -> failwith "protocol violation" | _ -> None } and loop_recv : type a. int -> (a,unit) continuation -> a -> unit = fun n k v -> continue_with k v { retc = Fun.id; exnc = raise; effc = fun (type b) (eff : b Effect.t) -> match eff with | Recv -> Some (fun (k: (b,_) continuation) -> loop_send k n) | Send v -> failwith "protocol violation" | _ -> None } in loop_send (fiber comp) ()

函数run执行计算comp,确保它只能执行SendRecv效果的交替序列。浅处理程序使用与深处理程序不同的基本原语。基本原语fiber(在最后一行)接受一个'a -> 'b函数并返回一个('a,'b) Effect.Shallow.continuation。表达式continue_with k v h在处理程序h下使用值v恢复延续k

互递归函数loop_sendloop_recv在不同的处理器下,使用值v恢复给定的延续k。函数loop_send处理Send效应,并尾调用函数loop_recv。如果计算执行Recv效应,则loop_send通过抛出异常来中止计算。类似地,函数loop_recv处理Recv效应,并尾调用函数loop_send。如果计算执行Send效应,则loop_recv中止计算。鉴于浅层处理器中捕获的延续不包含处理器本身,因此在计算comp的动态作用域中,始终只安装了一个处理器。

计算最初由函数loop_send执行(参见上面代码中的最后一行),这确保了计算允许执行的第一个效应是Send效应。请注意,计算可以自由执行除SendRecv之外的其他效应,这些效应可能由外部处理器处理。

我们可以看到,函数run将允许遵循以下协议的计算

run (fun () -> printf "Send 42\n"; perform (Send 42); printf "Recv: %d\n" (perform Recv); printf "Send 43\n"; perform (Send 43); printf "Recv: %d\n" (perform Recv))
Send 42 Recv: 42 Send 43 Recv: 43 - : unit = ()

并中止不遵循协议的计算

run (fun () -> Printf.printf "Send 0\n"; perform (Send 0); Printf.printf "Send 1\n"; perform (Send 1) (* 协议违规 *))
Send 0 Send 1 Exception: Failure "protocol violation".

我们可以使用深层处理器和引用单元(简单但不太令人满意)或不使用它们(更难)来实现相同的示例。我们将此留作读者的练习。