1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
-module(gleam@otp@task).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function]).
-export([async/1, try_await/2, await/2, try_await_forever/1, await_forever/1]).
-export_type([task/1, await_error/0, message/1]).
-opaque task(FWJ) :: {task,
gleam@erlang@process:pid_(),
gleam@erlang@process:pid_(),
gleam@erlang@process:process_monitor(),
gleam@erlang@process:selector(message(FWJ))}.
-type await_error() :: timeout | {exit, gleam@dynamic:dynamic_()}.
-type message(FWK) :: {from_monitor, gleam@erlang@process:process_down()} |
{from_subject, FWK}.
-spec async(fun(() -> FWL)) -> task(FWL).
async(Work) ->
Owner = erlang:self(),
Subject = gleam@erlang@process:new_subject(),
Pid = gleam@erlang@process:start(
fun() -> gleam@erlang@process:send(Subject, Work()) end,
true
),
Monitor = gleam@erlang@process:monitor_process(Pid),
Selector = begin
_pipe = gleam_erlang_ffi:new_selector(),
_pipe@1 = gleam@erlang@process:selecting_process_down(
_pipe,
Monitor,
fun(Field@0) -> {from_monitor, Field@0} end
),
gleam@erlang@process:selecting(
_pipe@1,
Subject,
fun(Field@0) -> {from_subject, Field@0} end
)
end,
{task, Owner, Pid, Monitor, Selector}.
-spec assert_owner(task(any())) -> nil.
assert_owner(Task) ->
Self = erlang:self(),
case erlang:element(2, Task) =:= Self of
true ->
nil;
false ->
gleam@erlang@process:send_abnormal_exit(
Self,
<<"awaited on a task that does not belong to this process"/utf8>>
)
end.
-spec try_await(task(FWP), integer()) -> {ok, FWP} | {error, await_error()}.
try_await(Task, Timeout) ->
assert_owner(Task),
case gleam_erlang_ffi:select(erlang:element(5, Task), Timeout) of
{ok, {from_subject, X}} ->
gleam_erlang_ffi:demonitor(erlang:element(4, Task)),
{ok, X};
{ok, {from_monitor, {process_down, _, Reason}}} ->
{error, {exit, Reason}};
{error, nil} ->
{error, timeout}
end.
-spec await(task(FWT), integer()) -> FWT.
await(Task, Timeout) ->
_assert_subject = try_await(Task, Timeout),
{ok, Value} = case _assert_subject of
{ok, _} -> _assert_subject;
_assert_fail ->
erlang:error(#{gleam_error => let_assert,
message => <<"Assertion pattern match failed"/utf8>>,
value => _assert_fail,
module => <<"gleam/otp/task"/utf8>>,
function => <<"await"/utf8>>,
line => 117})
end,
Value.
-spec try_await_forever(task(FWV)) -> {ok, FWV} | {error, await_error()}.
try_await_forever(Task) ->
assert_owner(Task),
case gleam_erlang_ffi:select(erlang:element(5, Task)) of
{from_subject, X} ->
gleam_erlang_ffi:demonitor(erlang:element(4, Task)),
{ok, X};
{from_monitor, {process_down, _, Reason}} ->
{error, {exit, Reason}}
end.
-spec await_forever(task(FWZ)) -> FWZ.
await_forever(Task) ->
_assert_subject = try_await_forever(Task),
{ok, Value} = case _assert_subject of
{ok, _} -> _assert_subject;
_assert_fail ->
erlang:error(#{gleam_error => let_assert,
message => <<"Assertion pattern match failed"/utf8>>,
value => _assert_fail,
module => <<"gleam/otp/task"/utf8>>,
function => <<"await_forever"/utf8>>,
line => 149})
end,
Value.
|