Combine CLI Tools with Elixir and Cizen

Ryo33
Cizen
Published in
4 min readNov 11, 2018

--

Cizen is a library to build concurrent applications with a collection of automata which communicate with each other by events. In this post, we are going to create an application to do concurrent image processing with saving our CPU resource. In addition, we’ll do it by combining existing CLI tools.

To prepare the development, create your mix project;

mix new cizen_example
cd cizen_example

and add cizen into your deps in mix.exs:

defp deps do
[
{:cizen, "~> 0.12.5"}
]
end

Then, run mix deps.get.

Also, we prepare the following mock CLI tools, get-cpu-usage.sh and process-image.sh.

get-cpu-usage.sh is a CLI tool to get a CPU usage:

echo 100 # We'll edit this value directly.

process-image.sh is a CLI tool to process the given image, and we mock the image processing by sleeping for a while:

# sleeps 1-10 seconds
sleep $(echo $RANDOM % 10 + 1 | bc)

It’s ready to combine them with Cizen.

In a Cizen way of application development, we should list automata at first. I think we need 3 automatons: ImageProcessor, which processes an image with process-image.sh; CPUMonitor, which monitors CPU usage with get-cpu-usage.sh; and Scheduler, which does task scheduling of the concurrent image processing.

Next, we should list events which the automata use. I think we need UpdateCPUUsage to notify change of CPU usage and Processed to notify an image is processed.

Then, we can write the following codes into image-processing.exs.

# Eventsdefmodule UpdateCPUUsage do
defstruct [:value]
end
defmodule Processed do
defstruct [:image]
end

These events are structs.

defmodule ImageProcessor do
use Cizen.Automaton
defstruct [:image]
alias Cizen.Effects.Dispatch @impl true
def spawn(_id, %__MODULE__{image: image}) do
image
end
@impl true
def yield(id, image) do
IO.puts("start processing: #{image}")
# Run process-image.sh
{_, 0} = System.cmd("bash", ["process-image.sh", image])
# Notify the image is processed
perform id, %Dispatch{
body: %Processed{image: image}
}
IO.puts("done: #{image}") Cizen.Automaton.finish() # Finish itself
end
end

This is the ImageProcessor automaton, and we can specify an image to process by using the struct like %ImageProcessor{image: "image-to-process.png"}.

defmodule CPUMonitor do
use Cizen.Automaton
defstruct []
alias Cizen.Effects.Dispatch @impl true
def spawn(_id, %__MODULE__{}) do
# Initial state
nil
end
@impl true
def yield(id, last_value) do
# Run get-cpu-usage.sh
{data, 0} = System.cmd("bash", ["get-cpu-usage.sh"])
# Parse the data as an integer value
value =
data
|> String.trim()
|> String.to_integer()
# If changed
if value != last_value do
IO.puts("cpu usage: #{value}")
# Notify the change of CPU usage
perform id, %Dispatch{
body: %UpdateCPUUsage{
value: value
}
}
end
:timer.sleep(1000) # Next state
value
end
end

This is the CPUMonitor automaton. It repeatedly runs get-cpu-usage.sh with an interval of a second, and notify changes.

defmodule Scheduler do
use Cizen.Automaton
defstruct [:images]
alias Cizen.{Event, Filter}
alias Cizen.Effects.{Start, Subscribe, Receive}
@impl true
def spawn(id, %__MODULE__{images: images}) do
# Subscribe UpdateCPUUsage and Processed
perform id, %Subscribe{
event_filter: Filter.any([
Filter.new(fn %Event{body: %UpdateCPUUsage{}} -> true end),
Filter.new(fn %Event{body: %Processed{}} -> true end)
])
}
# Initial state
%{
images: images,
lent_resources: 0,
max_resources: 1
}
end
@impl true
# Finish itself when there is no image to process
def yield(_id, %{images: []}), do: Cizen.Automaton.finish()
def yield(id, %{images: [image | tail]} = state) do
state = %{state | images: tail}
# Wait until a resource is available
state = borrow_resource(id, state)
# Start to process the image
perform id, %Start{
saga: %ImageProcessor{image: image}
}
# Next state
state
end
# Wait until a resource is available
defp borrow_resource(id, state) do
%{
lent_resources: lent_resources,
max_resources: max_resources
} = state
# Resources are available to lend
if lent_resources < max_resources do
%{state | lent_resources: lent_resources + 1}
else
event = perform id, %Receive{
event_filter: Filter.any([
Filter.new(fn %Event{body: %UpdateCPUUsage{}} -> true end),
Filter.new(fn %Event{body: %Processed{}} -> true end)
])
}
case event.body do
%UpdateCPUUsage{value: value} when value < 20 ->
borrow_resource(id, %{state | max_resources: 16})
%UpdateCPUUsage{value: value} when value < 40 ->
borrow_resource(id, %{state | max_resources: 8})
%UpdateCPUUsage{value: value} when value < 60 ->
borrow_resource(id, %{state | max_resources: 4})
%UpdateCPUUsage{value: value} when value < 80 ->
borrow_resource(id, %{state | max_resources: 2})
%UpdateCPUUsage{value: _value} ->
borrow_resource(id, %{state | max_resources: 1})
%Processed{} ->
borrow_resource(id, %{state | lent_resources: lent_resources - 1})
end
end
end
end

It’s very long! but simple. In yield/2, it starts ImageProcessor after calling borrow_resource/2, and in borrow_resource/2, it waits until a resource is available. The number of resources is updated by CPU usage and this is the mechanism of saving our machine resources.

defmodule Main do
use Cizen.Effectful # to use handle/1
alias Cizen.{Event, Filter}
alias Cizen.Effects.{Start, Subscribe, Receive}
def main do
# Images to process
images = Enum.map(1..50, &("image#{&1}.png"))
handle fn id ->
perform id, %Subscribe{
event_filter: Filter.new(fn %Event{body: %Processed{}} -> true end)
}
# Start monitoring CPU
perform id, %Start{
saga: %CPUMonitor{}
}
# Start scheduler
perform id, %Start{
saga: %Scheduler{images: images}
}
# Wait until all images are processed
for _ <- images do
perform id, %Receive{}
end
end
end
end
# Run
Main.main()

This is the main module. It starts CPUMonitor and Scheduler and waits until all the images are processed.

Now, we’re able to run the program;

mix run image-processing.exs

and to change the CPU usage manually while above command is running:

echo "echo 10" > get-cpu-usage.sh 

Here is the example execution of it:

The full code for this post is here.

--

--