|
| 1 | +# Proposal: Concurrency on Nash |
| 2 | + |
| 3 | +There has been some discussion on how to provide concurrency to nash. |
| 4 | +There is a [discussion here](https://github.com/NeowayLabs/nash/issues/224) |
| 5 | +on how concurrency could be added as a set of built-in functions. |
| 6 | + |
| 7 | +As we progressed discussing it seemed desirable to have a concurrency |
| 8 | +that enforced no sharing between concurrent functions. It eliminates |
| 9 | +races and forces all communication to happen explicitly, and the |
| 10 | +performance overhead would not be a problem to a high level language |
| 11 | +as nash. |
| 12 | + |
| 13 | +## Lightweight Processes |
| 14 | + |
| 15 | +This idea is inspired on Erlang concurrency model. Since Nash does |
| 16 | +not aspire to do everything that Erlang does (like distributed programming) |
| 17 | +so this is not a copy, we just take some things as inspiration. |
| 18 | + |
| 19 | +Why call this a process ? |
| 20 | +On the [Erlang docs](http://erlang.org/doc/getting_started/conc_prog.html) |
| 21 | +there is a interesting definition of process: |
| 22 | + |
| 23 | +``` |
| 24 | +the term "process" is usually used when the threads of execution share no |
| 25 | +data with each other and the term "thread" when they share data in some way. |
| 26 | +Threads of execution in Erlang share no data, |
| 27 | +that is why they are called processes |
| 28 | +``` |
| 29 | + |
| 30 | +In this context the process word is used to mean a concurrent thread of |
| 31 | +execution that does not share any data. The only means of communication |
| 32 | +are through message passing. Since these processes are lightweight |
| 33 | +creating a lot of them will be cheap (at least must cheaper than |
| 34 | +OS processes). |
| 35 | + |
| 36 | +Instead of using channel instances in this model you send messages |
| 37 | +to processes (actor model), it works pretty much like a networking |
| 38 | +model using UDP datagrams. |
| 39 | + |
| 40 | +The idea is to leverage this as a syntactic construction of the language |
| 41 | +to make it as explicit and easy as possible to use. |
| 42 | + |
| 43 | +This idea introduces 4 new concepts, 3 built-in functions and one |
| 44 | +new keyword. |
| 45 | + |
| 46 | +The keyword **spawn** is used to spawn a function as a new process. |
| 47 | +The function **send** is used to send messages to a process. |
| 48 | +The function **receive** is used to receive messages from a process. |
| 49 | +The function **self** returns the pid of the process calling it. |
| 50 | + |
| 51 | +An example of a simple ping/pong: |
| 52 | + |
| 53 | +``` |
| 54 | +pid <= spawn fn () { |
| 55 | + ping, senderpid <= receive() |
| 56 | + echo $ping |
| 57 | + send($senderpid, "pong") |
| 58 | +}() |
| 59 | +
|
| 60 | +send($pid, "ping", self()) |
| 61 | +pong <= receive() |
| 62 | +
|
| 63 | +echo $pong |
| 64 | +``` |
| 65 | + |
| 66 | +Spawned functions can also receive parameters (always deep copies): |
| 67 | + |
| 68 | +``` |
| 69 | +pid <= spawn fn (answerpid) { |
| 70 | + send($answerpid, "pong") |
| 71 | +}(self()) |
| 72 | +
|
| 73 | +pong <= receive() |
| 74 | +echo $pong |
| 75 | +``` |
| 76 | + |
| 77 | +A simple fan-out/fan-in implementation (N jobs <-> N processes): |
| 78 | + |
| 79 | +``` |
| 80 | +jobs = ("1" "2" "3" "4" "5") |
| 81 | +
|
| 82 | +for job in $jobs { |
| 83 | + spawn fn (job, answerpid) { |
| 84 | + import io |
| 85 | +
|
| 86 | + io_println("job[%s] done", $job) |
| 87 | + send($answerpid, format("result [%s]", $job)) |
| 88 | + }($job, self()) |
| 89 | +} |
| 90 | +
|
| 91 | +for job in $jobs { |
| 92 | + result <= receive() |
| 93 | + echo $result |
| 94 | +} |
| 95 | +``` |
| 96 | + |
| 97 | +All output (stdout and stderr) of processes go to their |
| 98 | +parent until the root (main) process, so printing inside |
| 99 | +a child process will print on the stdout of the main process. |
| 100 | + |
| 101 | +### Advanced Fan-out Fan-in |
| 102 | + |
| 103 | +Here is an example of a more elaborated fan-out/fan-in. |
| 104 | +On this case we have much more jobs to execute than |
| 105 | +workers, so it requires more coordination than the previous example. |
| 106 | + |
| 107 | +For brevity this example does not handle timeouts. |
| 108 | + |
| 109 | +Lets suppose an script that tries different passwords on a host: |
| 110 | + |
| 111 | +``` |
| 112 | +var passwords_feed <= spawn fn() { |
| 113 | +
|
| 114 | + fn sendpassword(password) { |
| 115 | + var worker <= receive() |
| 116 | + if !send($worker, $password) { |
| 117 | + sendpassword($password) |
| 118 | + } |
| 119 | + } |
| 120 | +
|
| 121 | + for password in generate_passwords() { |
| 122 | + sendpassword($password) |
| 123 | + } |
| 124 | +} |
| 125 | +
|
| 126 | +fn login(output, passwords_feed, done) { |
| 127 | +
|
| 128 | + for send($passwords_feed, self()) { |
| 129 | + var password = receive() |
| 130 | + var result <= login "someuser" $password |
| 131 | + send($output, $result) |
| 132 | + } |
| 133 | +
|
| 134 | + send($done, "done") |
| 135 | +} |
| 136 | +
|
| 137 | +fn outputhandler() { |
| 138 | + for { |
| 139 | + var result = receive() |
| 140 | + if $result == "0" { |
| 141 | + echo "success" |
| 142 | + } |
| 143 | + } |
| 144 | +} |
| 145 | +
|
| 146 | +var workers = 10 |
| 147 | +
|
| 148 | +var feed <= spawn passwords_feed() |
| 149 | +var outputhandler <= spawn outputhandler() |
| 150 | +
|
| 151 | +for i in range(0, $workers) { |
| 152 | + spawn login($outputhandler, $feed, self()) |
| 153 | +} |
| 154 | +
|
| 155 | +for i in range(0, $workers) { |
| 156 | + msg <= receive() |
| 157 | + if $msg != "done" { |
| 158 | + echo "dafuck ?" |
| 159 | + } |
| 160 | +} |
| 161 | +``` |
| 162 | + |
| 163 | +### Error Handling |
| 164 | + |
| 165 | +Error handling on this concurrency model is very similar to |
| 166 | +how we do it on a distributed system. If a remote service fails and |
| 167 | +just dies and you are using UDP you will never be informed of it, |
| 168 | +the behavior will be to timeout the request and try again (possibly |
| 169 | +to another service instance through a load balancer). |
| 170 | + |
| 171 | +To implement this idea we can add a timeout to the receive an add |
| 172 | +a new parameter, a boolean, indicating if there is a message or if a |
| 173 | +timeout has occurred. |
| 174 | + |
| 175 | +Example: |
| 176 | + |
| 177 | +``` |
| 178 | +msg, ok <= receive(timeout) |
| 179 | +if !ok { |
| 180 | + echo "oops timeout" |
| 181 | +} |
| 182 | +``` |
| 183 | + |
| 184 | +The timeout can be omitted if you wish to just wait forever. |
| 185 | + |
| 186 | +For send operations we need to add just one boolean return |
| 187 | +value indicating if the process pid exists and the message |
| 188 | +has been delivered: |
| 189 | + |
| 190 | +``` |
| 191 | +if !send($pid, $msg) { |
| 192 | + echo "oops message cant be sent" |
| 193 | +} |
| 194 | +``` |
| 195 | + |
| 196 | +Since the processes are always local there is no need for a more |
| 197 | +detailed error message (the message would always be the same), the |
| 198 | +error will always involve a pid that has no owner (the process never |
| 199 | +existed or already exited). |
| 200 | + |
| 201 | +We could add a more specific error message if we decide that |
| 202 | +the process message queue can get too big and we start to |
| 203 | +drop messages. The error would help to differentiate |
| 204 | +from a dead process or a overloaded process. |
| 205 | + |
| 206 | +An error indicating a overloaded process could help |
| 207 | +to implement back pressure logic (try again later). |
| 208 | +But if we are sticking with local concurrency only this |
| 209 | +may be unnecessary complexity. You can avoid this by |
| 210 | +always sending N messages and waiting for N responses |
| 211 | +before sending more messages. |
| 212 | + |
| 213 | +### TODO |
| 214 | + |
| 215 | +Spawned functions should have access to imported modules ? |
| 216 | +(seems like no, but some usages of this may seem odd) |
| 217 | + |
| 218 | +If send is never blocking, what if process queue gets too big ? |
| 219 | +just go on until memory exhausts ? |
| 220 | + |
| 221 | +Should send be synchronous how we are going to differentiate |
| 222 | +between a timeout or a invalid pid error ? On the other hand |
| 223 | +synchronous send solves the queueing problem. |
| 224 | + |
| 225 | +## Extend rfork |
| 226 | + |
| 227 | +Converging to a no shared state between concurrent functions initiated |
| 228 | +the idea of using the current rfork built-in as a means to express |
| 229 | +concurrency on Nash. This would already be possible today, the idea |
| 230 | +is just to make it even easier, specially the communication between |
| 231 | +different concurrent processes. |
| 232 | + |
| 233 | +This idea enables an even greater amount of isolation between concurrent |
| 234 | +processes since rfork enables different namespaces isolation (besides memory), |
| 235 | +but it has the obvious fallback of not being very lightweight. |
| 236 | + |
| 237 | +Since the idea of nash is to write simple scripts this does not seem |
| 238 | +to be a problem. If it is on the future we can create lightweight concurrent |
| 239 | +processes (green threads) that works orthogonally with rfork. |
| 240 | + |
| 241 | +The prototype for the new rfork would be something like this: |
| 242 | + |
| 243 | +```sh |
| 244 | +chan <= rfork [ns_param1, ns_param2] (chan) { |
| 245 | + //some code |
| 246 | +} |
| 247 | +``` |
| 248 | + |
| 249 | +The code on the rfork block does not have access to the |
| 250 | +lexical outer scope but it receives as a parameter a channel |
| 251 | +instance. |
| 252 | + |
| 253 | +This channel instance can be used by the forked processes and |
| 254 | +by the creator of the process to communicate. We could use built-in functions: |
| 255 | + |
| 256 | +```sh |
| 257 | +chan <= rfork [ns_param1, ns_param2] (chan) { |
| 258 | + cwrite($chan, "hi") |
| 259 | +} |
| 260 | + |
| 261 | +a <= cread($chan) |
| 262 | +``` |
| 263 | + |
| 264 | +Or some syntactic extension: |
| 265 | + |
| 266 | +```sh |
| 267 | +chan <= rfork [ns_param1, ns_param2] (chan) { |
| 268 | + $chan <- "hi" |
| 269 | +} |
| 270 | + |
| 271 | +a <= <-$chan |
| 272 | +``` |
| 273 | + |
| 274 | +Since this channel is meant only to be used to communicate with |
| 275 | +the created process, it will be closed when the process exit: |
| 276 | + |
| 277 | +```sh |
| 278 | +chan <= rfork [ns_param1, ns_param2] (chan) { |
| 279 | +} |
| 280 | + |
| 281 | +# returns empty string when channel is closed |
| 282 | +<-$chan |
| 283 | +``` |
| 284 | + |
| 285 | +Fan out and fan in should be pretty trivial: |
| 286 | + |
| 287 | +```sh |
| 288 | +chan1 <= rfork [ns_param1, ns_param2] (chan) { |
| 289 | +} |
| 290 | + |
| 291 | +chan2 <= rfork [ns_param1, ns_param2] (chan) { |
| 292 | +} |
| 293 | + |
| 294 | +# waiting for both to finish |
| 295 | +<-$chan1 |
| 296 | +<-$chan2 |
| 297 | +``` |
0 commit comments