Устройство и комуникация между процеси
- Actor моделът и Elixir процесите
- Устройство на процес
- Процеси очакващи множество съобщения
- Използване на receive с timeout
- Освобождаване на паметта на процесите (GC)
- Всичко е Actor. Подобно на обектно-ориентираната идеология : 'всичко е обект'. |
- Всеки Actor чака за съобщения. Когато получи съобщение, той може: |
- Да изпрати краен брой съобщения към други Actor-и. |
- Да създаде краен брой нови Actor-и. |
- Да определи поведение, което ще се изпълни, когато получи следващо съобщение, адресирано към него. |
- Тези три действия нямат определен ред и могат да са пралелни.
- Тези съобщения се предават асинхронно. |
- Всеки Actor си има адрес, понякога наричан пощенска кутия (mail box). |
- Actor-и могат да комуникират само и единствено когато знаят адресите си. |
- В Elixir не всичко е процес.
- Всяко парче код се изпълнява в процес, но типовете данни не са процеси.
- В Elixir при получаване на съобщения, кодът в самият процес е последователен.
- Процесите в Elixir се държат като Actor-и.
- Чакат за съобщения от други процеси и реагират на тях.
- При получено съобщение, могат да изпратят нови съобщения до други процеси.
- При получено съобщение, могат да създадат краен брой други процеси.
- При получено съобщение, могат да заложат поведение за следващо съобщение.
- Процесите в Elixir имат опашка за получените съобщения - 'mail box'.
- Процесите в еликсир имат адреси (PID) и само процеси, знаещи адресите си могат да комуникират помежду си.
- Тук се държи адресът на процес, PID-а.
- Състояние - дали чака или пък се изпълнява в момента.
- Възможно е да реферираме процес и по име, което също се държи тук.
defmodule Responder do
def run do
Process.register(self(), :responder)
receive do
{pid, :ping} when is_pid(pid) ->
send(pid, :pong)
{pid, anything} when is_pid(pid) ->
send(pid, "I received #{anything}.")
end
end
end@[3] @[5-10] @[6-7] @[8-9]
spawn(Responder, :run, [])
send(:responder, {self(), :ping})
receive do
:pong -> IO.puts("PONG!")
end
# PONG!@[1] @[3] @[5-7]
Можем да видим всички регистрирани имена с Process.registered/0.
spawn(Responder, :run, [])
:responder |> Process.whereis |> Process.info(:registered_name)
# {:registered_name, :responder}@[1] @[3]
- Всеки процес си има собствен Stack.
- При създаването на процеса този стек е изключително малък, но може да расте. |
- В стека се пазят данни с големина максимум една дума |
- Както виждате на диаграмата, докато има свободно място, stack-ът може да се разширява надолу. |
- При стартиране на процеса, heap-ът му също е малък, но може да се разширява нагоре.
- Тук се намира и опашката от идващи съобщения на процеса. |
- Тук се пазят непроменимите структури като списъци, кортежи, както и числа с плаваща запетая, малки binary-та (под 64 байта). |
- За по-големите, Refc binary-а се пазят само указателите ProcBin. |
- Може да се разширява, докато има памет за това.
- В опашката са съобщенията, които процесът е получил в реда на пристигането си. |
- Ако няма съобщения в опашката, receive блокира процеса и чака, докато се получи поне едно ново съобщение. |
- Когато пристигне ново съобщение, то се слага в опашката.
- Когато процесът стане активен, съобщението се match-ва към условията в receive. |
- Ако има успех, то се премахва от опашката. |
- Ако съобщението не успее да се match-не, то се запазва за изчакване. |
- Този алгоритъм се повтаря за следващото съобщение и така, докато опашката стане празна.
- В този момент изчакващите съобщения се връщат в опашката, и ще бъдат съпоставени на клаузите в receive при получаване на следващо съобщение.
- Колкото повече такива не-match-нати съобщения се застоят в опашката, толкова по-бавен ще става receive алгоритъмът.
- Освен това, те ще заемат място в паметта на процеса.
spawn(Responder, :run, [])
1..200 |> Enum.map(fn n -> send(:responder, "junk#{n}") end)
:responder |> Process.whereis |> Process.info(:messages)
#=> {:messages, [...]}@[1] @[3] @[5]
send(:responder, {self(), :ping})defmodule Responder do
def run do
Process.register(self(), :responder)
receive do
{pid, :ping} when is_pid(pid) ->
send(pid, :pong)
{pid, anything} when is_pid(pid) ->
send(pid, "I received #{anything}.")
anything ->
IO.puts("Unexpected message received : #{anything}")
end
end
endspawn(Responder, :run, [])
send(:responder, "junk")
# Ще видим 'Unexpected message received : junk'- Понякога, обаче receive блокът за процес може да се променя с получаване на нови съобщения.
- Тогава не знаем какво ще бъде match-нато в бъдеще и е добре да си пазим не-match-натите съобщения.
Процеси очакващи множество съобщения

defmodule Responder do
def run do
wait()
wait()
end
defp wait do
receive do
{pid, :ping} when is_pid(pid) -> send(pid, :pong)
{pid, anything} when is_pid(pid) ->
send(pid, "I received #{anything}.")
anything ->
IO.puts("Unexpected message received : #{anything}")
end
end
end@[3-4] @[7-15]
pid = spawn(Responder, :run, [])
Process.alive?(pid)
#=> true
send(pid, {self(), "Hey!"})
receive do
msg -> IO.puts(msg)
end
Process.alive?(pid)
#=> true@[1-2] @[5] @[6-8] @[9]
send(pid, {self(), :ping})
receive do
msg -> IO.puts(msg)
end
# Ще видим 'pong'
Process.alive?(pid)
#=> false@[1] @[2-5] @[7-8]
defmodule Responder do
def run do
receive do
{pid, :ping} when is_pid(pid) ->
send(pid, :pong)
run()
{pid, anything} when is_pid(pid) ->
send(pid, "I received #{anything}.")
run()
anything ->
IO.puts("Unexpected message received : #{anything}.")
end
end
end@[4-6] @[7-9] @[10-11]
1..10
|> Enum.map(fn _ -> send(pid, {self(), :ping}) end)
|> Enum.each(fn _ -> receive do msg -> IO.puts(msg); end end)
# Ще видим 10 'pong'
send(pid, "Bye")
# Unexpected message received : Bye.
Process.alive?(pid)
#=> false@[1-5] @[7-8] @[9-10]
Използване на receive с timeout

defmodule Fibonacci do
def run do
receive do
{pid, n} when is_pid(pid) and is_number(n) and n > 0 ->
send(pid, nth(n))
{pid, _} when is_pid(pid) ->
send(pid, "I CAN'T COMPILE!")
anything ->
IO.puts(:stderr, "Bad query #{anything}")
end
run()
end
defp nth(1), do: 1
defp nth(2), do: 1
defp nth(n), do: nth(n - 1) + nth(n - 2)
end@[4-5] @[13-15] @[6-7] @[8-9] @[11]
pid = spawn(Fibonacci, :run, [])
send(pid, {self(), 10})
receive do msg -> IO.puts(msg); end
#=> 55@[1] @[3] @[4]
send(pid, {self(), 50})
receive do msg -> IO.puts(msg); end- Тази заявка отнема доста време.
- Текущият процес забива на receive.
send(pid, {self(), 50})
receive do
msg -> IO.puts(msg)
after 3000 ->
IO.puts("Tired of waiting. Bye!")
end@[4-5]
sleep = fn(time) ->
receive do
after time -> :ok
end
end
sleep.(2000)
# Текущият процес ще забие за 2 секунди.@[2-4] @[7-8]
defmodule Flusher do
def flush_it do
receive do
msg ->
IO.puts(msg)
flush_it()
after 0 -> :ok
end
end
end@[4-6] @[7]
pid = spawn(Fibonacci, :run, [])
1..10 |> Enum.each(fn n -> send(pid, {self(), n}) end)
Flusher.flush_itОсвобождаване на паметта на процесите (GC)

- Освобождаването на паметта (GC) за heap-овете на процесите е generational.
- Garbage Collector-ът разделя паметта на две поколения - старо и ново. |
- Това разделение е базирано на идеята, че ако обект в паметта остане след цикъл на GC-то, | то шансът да бъде премахнат скоро е малък.
- Новото поколение се състои от скоро-създадена информация.
- Старото от информация, която не е била премахната след даден брой цикли на GC. |
- Това разделени помага на GC, като се намаляват ненужни цикли върху старото поколение информация. |
Нека разгледаме алгоритъма на освобождение на паметта
- При процеси, които не използват heap, по-голям от min_heap_size.
- Няма освобождаване на паметта.
- Когато тези процеси 'умрат', цялата паметта е освободена от GC.
- При процес, който използва памет повече от min_heap_size.
- Този случай е валиден за процеси, които не живеят извънредно дълго.
- Процесът е създаден (spawn)
- fullsweep GC стратегия се използва първоначално, тъй като още нямаме ново и старо поколение.
- На този етап имаме нови и стари поколения, използва се generational GC.
- Повтаряме (3)
- Процесът се унищожава и паметта му е изчистена.
spawn -> fullsweep -> generational -> generational -> ... -> end
- Когато процес е активен за прекалено дълго време, fullsweep стратегията може да се активира отново.
- Това се случва след определен брой generational GC цикли.
- Има флаг за този брой fullsweep_after.
- Използва се брояч наречен minor_gcs.
Кога се изпълнява fullsweep GC?
- Когато процесът не е способен да си освободи достатъчно памет при нужда.
- Когато извикаме :erlang.garbage_collect(pid).
Process.info(pid, :garbage_collection)
{
:garbage_collection,
[
max_heap_size: %{error_logger: true, kill: true, size: 0},
min_bin_vheap_size: 46422, min_heap_size: 233,
fullsweep_after: 65535, minor_gcs: 0
]
}@[1] @[5-7]
- Когато се наложи fullsweep GC, защото няма достатъчно памет и се окаже, че GC не може да освободи нужната памет се увеличава heap-ът на процеса.
- В тази памет GC изчиства само обекти чиито референции са нула.
- Това е много бърза стратегия, защото е лесно да се отделят обектите за изчистване. |
- Точно в тази памет, обаче са възможни memory leaks по доста лесен начин. |
- Sub-binary опасност.
- Лек процес с ProcBin към голямо binary опасност. |










