Easily Create New Sync Types with ZIO STM
In this post I will share how I created a synchronisation primitive called Gate
that allows to potentially suspend execution by setting a boolean flag using a few lines of ZIO STM code.
Background
At Wix, we take testing seriously. Each commonly used micro-service supplies a testkit to its users.
Recently, I wrote a testkit that mimics our Kafka-client Sidecar (separate container that communicates with Kafka and implements additional client-side features).
The testkit simulates the Kafka broker by directly putting messages-to-be-produced (that the sidecar received) into fake consumer queues for further processing.
Requirement: Wait until message consumption is allowed to resume
One of the Kafka-client sidecar features is to globally pause message consumption (and then resume it). For the testkit logic I was required to implement the following API:
def pauseConsume(): UIO[Unit]
def resumeConsume(): UIO[Unit]
def suspendIfPaused(): UIO[Unit]
This required me to suspend and resume handling of produced messages in my testkit code according to the state of some flag, that was altered by a user request.
I wrote the testkit code in ZIO, which was a lot of fun!
ZIO is a great pure functional Scala library that is optimised for writing concurrent/asynchronous code.
Obviously I wanted to find if ZIO has the ability to synchronise between the pause request and the consumption suspension.
Does ZIO have the appropriate synchronisation primitive?
ZIO offers several synchronisation primitives such as a Semaphore, Promise, Ref, etc.
First, I thought I would use ZIO Ref with a boolean flag:suspendConsumption <- Ref.make(false)
and then, on incoming produced message, check if consumption needs to be suspended:ZIO.WhenM(suspendConsumption.get)
But there is a problem — While Ref
allows safe multithreaded (multi-fibered in ZIO
’s case) access,ZIO.WhenM
does not suspend the executing fiber in case the boolean expression is false.
Next, I looked at ZIO Semaphore but while the Semaphore
does allow suspension withdef withPermit[R, E, A](task: ZIO[R, E, A])
,
it does not allow to have separate actors, where one controls the state change (e.g. pause request) and the other executes the check-state-and-maybe-suspend effect.
Finally, I tried Promise. Promise
looked promising :). It offers a way to pause consumption, simply by creating the promise:consumersArePausedPromise <- Promise.make[Nothing, Unit]
a maybe-suspend method:consumersArePausedPromise.await
and a resume method: consumersArePausedPromise.succeed(())
but you can only perform this once - not multiple times. Once the Promise
is completed, it can never be suspended on again.
ZIO STM to the rescue
While it seemed like ZIO has failed me for the first time as there were no ZIO synchronisation primitives that exactly matched my requirement, colleagues whom I consulted advised me to use ZIO STM as the building block for my own custom Gate functionality that will comply with my requirements.
STM stands for Software Transactional Memory. According to its scaladoc:
STM[E,A]
represents an effect that can be performed transactionally, resulting in a failure E
or a value A
.
…
Software Transactional Memory is a technique which allows composition of arbitrary atomic operations. It is the software analog of transactions in database systems.
For more information on STM check out this great article by Fredrik Skogberg.
The generalised API
The consumption pause/resume/suspendIdNotAllowed requirement can be generalised to a Gate synchronisation primitive (Some of you may be familiar with ManualResetEvent in C#).
Just replace pauseConsume
with lock
, resumeConsume
with unlock
, and suspendIfNotAllowed
with waitIfLocked
The API implementation
Below you can see how Gate
was implemented using ZIO STM. Each method is implemented with a STM atomic transaction. It only takes a single line of code!
.commit
is just syntactic sugar for wrapping the effect in STM.atomically
.
So signal.set(false).commit
is identical to
STM.atomically {
signal.set(false)
}
Notice that it uses STM’s TRef[Boolean]
. The difference between TRef
and Ref
is that you can compose TRef
withSTM.check
which will suspend the effect and will retry only once the TRef value is changed.
So now with STM’s TRef
we have a boolean state ref that can be suspended on, unlike the regular ZIO Ref
.
Using the Gate
Below you can see a snippet of the testkit’s FakeSidecarService
code that sets pause/resume consumption using the Gate
:
And also a snippet of the testkit’s FakeConsumerHandler
code that maybe-suspends consumption in case of a pause request using the Gate
:
Unlike most Java/Scala synchronisation primitives, the executing thread is not blocked by this effect suspension because ZIO is implemented using lightweight green threads (Fibers).
Summary
In this post I showed how easy it is to create additional synchronization primitives in a few lines of ZIO code and how powerful STM can be even for very simple atomic operations.
Thank you for reading!
If you’d like to get updates on my experiences with ZIO, follow me on Twitter and Medium.
You can also visit my website, where you will find my previous blog posts, talks I gave in conferences and open-source projects I’m involved with.
If anything is unclear or you want to point out something, please comment down below.