Txn
Overview
Txn
is a monad which describes transactions involving TVar
s. It is executed via
STM#commit
:
import cats.implicits._
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import io.github.timwspence.cats.stm.STM
val stm = STM.runtime[IO].unsafeRunSync()
// stm: STM[IO] = io.github.timwspence.cats.stm.STM$Make$$anon$1$$anon$2@32f2eef3
import stm._
val prog: IO[(Int, Int)] = for {
to <- stm.commit(TVar.of(0))
from <- stm.commit(TVar.of(100))
_ <- stm.commit {
for {
balance <- from.get
_ <- from.modify(_ - balance)
_ <- to.modify(_ + balance)
} yield ()
}
v <- stm.commit((to.get, from.get).tupled)
} yield v
// prog: IO[(Int, Int)] = FlatMap(
// ioe = FlatMap(
// ioe = Uncancelable(
// body = cats.effect.IO$$$Lambda$10493/0x0000000802d50840@30ad9e8,
// event = cats.effect.tracing.TracingEvent$StackTrace
// ),
// f = io.github.timwspence.cats.stm.STM$Make$$anon$1$$anon$2$$Lambda$10494/0x0000000802d51040@777decd6,
// event = cats.effect.tracing.TracingEvent$StackTrace
// ),
// f = <function1>,
// event = cats.effect.tracing.TracingEvent$StackTrace
// )
val result = prog.unsafeRunSync()
// result: (Int, Int) = (100, 0)
Retries
STM#commit
supports the concept of retries, which can be introduced via
STM.check
:
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import io.github.timwspence.cats.stm.STM
val stm = STM.runtime[IO].unsafeRunSync()
// stm: STM[IO] = io.github.timwspence.cats.stm.STM$Make$$anon$1$$anon$2@6c79830a
import stm._
val to = stm.commit(TVar.of(1)).unsafeRunSync()
// to: TVar[Int] = io.github.timwspence.cats.stm.STMLike$TVar@239e47d3
val from = stm.commit(TVar.of(0)).unsafeRunSync()
// from: TVar[Int] = io.github.timwspence.cats.stm.STMLike$TVar@2157f976
val txn: IO[Unit] = stm.commit {
for {
balance <- from.get
_ <- stm.check(balance > 100)
_ <- from.modify(_ - 100)
_ <- to.modify(_ + 100)
} yield ()
}
// txn: IO[Unit] = FlatMap(
// ioe = Uncancelable(
// body = cats.effect.IO$$$Lambda$10493/0x0000000802d50840@3f840d29,
// event = cats.effect.tracing.TracingEvent$StackTrace
// ),
// f = io.github.timwspence.cats.stm.STM$Make$$anon$1$$anon$2$$Lambda$10494/0x0000000802d51040@cfd63e7,
// event = cats.effect.tracing.TracingEvent$StackTrace
// )
txn.unsafeRunSync()
will block until the transaction succeeds (or throws an
exception!). Internally, this is implemented by keeping track of which TVar
s are
involved in a transaction and retrying any pending transactions every time a TVar
is committed.
OrElse
STM.orElse
is built on top of the retry logic and allows you to attempt an
alternative action if the first retries:
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import io.github.timwspence.cats.stm.STM
val stm = STM.runtime[IO].unsafeRunSync()
// stm: STM[IO] = io.github.timwspence.cats.stm.STM$Make$$anon$1$$anon$2@48423ade
import stm._
val to = stm.commit(TVar.of(1)).unsafeRunSync()
// to: TVar[Int] = io.github.timwspence.cats.stm.STMLike$TVar@40ed554a
val from = stm.commit(TVar.of(0)).unsafeRunSync()
// from: TVar[Int] = io.github.timwspence.cats.stm.STMLike$TVar@4a015bc6
val transferHundred: Txn[Unit] = for {
b <- from.get
_ <- stm.check(b > 100)
_ <- from.modify(_ - 100)
_ <- to.modify(_ + 100)
} yield ()
// transferHundred: Txn[Unit] = Bind(
// txn = Get(tvar = io.github.timwspence.cats.stm.STMLike$TVar@4a015bc6),
// f = <function1>
// )
val transferRemaining: Txn[Unit] = for {
balance <- from.get
_ <- from.modify(_ - balance)
_ <- to.modify(_ + balance)
} yield ()
// transferRemaining: Txn[Unit] = Bind(
// txn = Get(tvar = io.github.timwspence.cats.stm.STMLike$TVar@4a015bc6),
// f = <function1>
// )
val txn = for {
_ <- transferHundred.orElse(transferRemaining)
f <- from.get
t <- to.get
} yield f -> t
// txn: Txn[(Int, Int)] = Bind(
// txn = OrElse(
// txn = Bind(
// txn = Get(tvar = io.github.timwspence.cats.stm.STMLike$TVar@4a015bc6),
// f = <function1>
// ),
// fallback = Bind(
// txn = Get(tvar = io.github.timwspence.cats.stm.STMLike$TVar@4a015bc6),
// f = <function1>
// )
// ),
// f = <function1>
// )
val result = stm.commit(txn).unsafeRunSync()
// result: (Int, Int) = (0, 1)
Aborting
Transactions can be aborted via STM.abort
:
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import io.github.timwspence.cats.stm.STM
val stm = STM.runtime[IO].unsafeRunSync()
// stm: STM[IO] = io.github.timwspence.cats.stm.STM$Make$$anon$1$$anon$2@4f9c7d38
import stm._
val to = stm.commit(TVar.of(1)).unsafeRunSync()
// to: TVar[Int] = io.github.timwspence.cats.stm.STMLike$TVar@7035068e
val from = stm.commit(TVar.of(0)).unsafeRunSync()
// from: TVar[Int] = io.github.timwspence.cats.stm.STMLike$TVar@5a26317e
val txn = for {
balance <- from.get
_ <- if (balance < 100)
stm.abort(new RuntimeException("Balance must be at least 100"))
else
stm.unit
_ <- from.modify(_ - 100)
_ <- to.modify(_ + 100)
} yield ()
// txn: Txn[Unit] = Bind(
// txn = Get(tvar = io.github.timwspence.cats.stm.STMLike$TVar@5a26317e),
// f = <function1>
// )
val result = stm.commit(txn).attempt.unsafeRunSync()
// result: Either[Throwable, Unit] = Left(
// value = java.lang.RuntimeException: Balance must be at least 100
// )
Note that aborting a transaction will not modify any of the TVar
s involved.