Txn

Overview

Txn is a monad which describes transactions involving TVars. It is executed via STM#commit:

import cats.implicits._
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import io.github.timwspence.cats.stm.STM

val stm = STM.runtime[IO].unsafeRunSync()
// stm: STM[IO] = io.github.timwspence.cats.stm.STM$Make$$anon$1$$anon$2@38c6fe20
import stm._

val prog: IO[(Int, Int)] = for {
  to   <- stm.commit(TVar.of(0))
  from <- stm.commit(TVar.of(100))
  _  <- stm.commit {
    for {
      balance <- from.get
      _       <- from.modify(_ - balance)
      _       <- to.modify(_ + balance)
    } yield ()
  }
  v   <- stm.commit((to.get, from.get).tupled)
} yield v
// prog: IO[(Int, Int)] = FlatMap(
//   ioe = FlatMap(
//     ioe = Uncancelable(
//       body = cats.effect.IO$$$Lambda$10470/0x0000000802dbc840@934e70e,
//       event = cats.effect.tracing.TracingEvent$StackTrace
//     ),
//     f = io.github.timwspence.cats.stm.STM$Make$$anon$1$$anon$2$$Lambda$10471/0x0000000802dbb840@5751d25b,
//     event = cats.effect.tracing.TracingEvent$StackTrace
//   ),
//   f = <function1>,
//   event = cats.effect.tracing.TracingEvent$StackTrace
// )

val result = prog.unsafeRunSync()
// result: (Int, Int) = (100, 0)

Retries

STM#commit supports the concept of retries, which can be introduced via STM.check:

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import io.github.timwspence.cats.stm.STM

val stm = STM.runtime[IO].unsafeRunSync()
// stm: STM[IO] = io.github.timwspence.cats.stm.STM$Make$$anon$1$$anon$2@4c55bfab
import stm._

val to   = stm.commit(TVar.of(1)).unsafeRunSync()
// to: TVar[Int] = io.github.timwspence.cats.stm.STMLike$TVar@85bce54
val from = stm.commit(TVar.of(0)).unsafeRunSync()
// from: TVar[Int] = io.github.timwspence.cats.stm.STMLike$TVar@7910de7

val txn: IO[Unit]  = stm.commit {
  for {
    balance <- from.get
    _       <- stm.check(balance > 100)
    _       <- from.modify(_ - 100)
    _       <- to.modify(_ + 100)
  } yield ()
}
// txn: IO[Unit] = FlatMap(
//   ioe = Uncancelable(
//     body = cats.effect.IO$$$Lambda$10470/0x0000000802dbc840@34459dda,
//     event = cats.effect.tracing.TracingEvent$StackTrace
//   ),
//   f = io.github.timwspence.cats.stm.STM$Make$$anon$1$$anon$2$$Lambda$10471/0x0000000802dbb840@10caee7c,
//   event = cats.effect.tracing.TracingEvent$StackTrace
// )

txn.unsafeRunSync() will block until the transaction succeeds (or throws an exception!). Internally, this is implemented by keeping track of which TVars are involved in a transaction and retrying any pending transactions every time a TVar is committed.

OrElse

STM.orElse is built on top of the retry logic and allows you to attempt an alternative action if the first retries:

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import io.github.timwspence.cats.stm.STM

val stm = STM.runtime[IO].unsafeRunSync()
// stm: STM[IO] = io.github.timwspence.cats.stm.STM$Make$$anon$1$$anon$2@33b336fe
import stm._

val to   = stm.commit(TVar.of(1)).unsafeRunSync()
// to: TVar[Int] = io.github.timwspence.cats.stm.STMLike$TVar@328d905b
val from = stm.commit(TVar.of(0)).unsafeRunSync()
// from: TVar[Int] = io.github.timwspence.cats.stm.STMLike$TVar@3526e2d7

val transferHundred: Txn[Unit] = for {
  b <- from.get
  _ <- stm.check(b > 100)
  _ <- from.modify(_ - 100)
  _ <- to.modify(_ + 100)
} yield ()
// transferHundred: Txn[Unit] = Bind(
//   txn = Get(tvar = io.github.timwspence.cats.stm.STMLike$TVar@3526e2d7),
//   f = <function1>
// )

val transferRemaining: Txn[Unit] = for {
  balance <- from.get
  _       <- from.modify(_ - balance)
  _       <- to.modify(_ + balance)
} yield ()
// transferRemaining: Txn[Unit] = Bind(
//   txn = Get(tvar = io.github.timwspence.cats.stm.STMLike$TVar@3526e2d7),
//   f = <function1>
// )

val txn  = for {
  _    <- transferHundred.orElse(transferRemaining)
  f    <- from.get
  t    <- to.get
} yield f -> t
// txn: Txn[(Int, Int)] = Bind(
//   txn = OrElse(
//     txn = Bind(
//       txn = Get(tvar = io.github.timwspence.cats.stm.STMLike$TVar@3526e2d7),
//       f = <function1>
//     ),
//     fallback = Bind(
//       txn = Get(tvar = io.github.timwspence.cats.stm.STMLike$TVar@3526e2d7),
//       f = <function1>
//     )
//   ),
//   f = <function1>
// )

val result = stm.commit(txn).unsafeRunSync()
// result: (Int, Int) = (0, 1)

Aborting

Transactions can be aborted via STM.abort:

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import io.github.timwspence.cats.stm.STM

val stm = STM.runtime[IO].unsafeRunSync()
// stm: STM[IO] = io.github.timwspence.cats.stm.STM$Make$$anon$1$$anon$2@4c635a9e
import stm._

val to   = stm.commit(TVar.of(1)).unsafeRunSync()
// to: TVar[Int] = io.github.timwspence.cats.stm.STMLike$TVar@4c5a89ba
val from = stm.commit(TVar.of(0)).unsafeRunSync()
// from: TVar[Int] = io.github.timwspence.cats.stm.STMLike$TVar@6ecbd07a

val txn  = for {
  balance <- from.get
  _       <- if (balance < 100)
               stm.abort(new RuntimeException("Balance must be at least 100"))
             else
               stm.unit
  _ <- from.modify(_ - 100)
  _ <- to.modify(_ + 100)
} yield ()
// txn: Txn[Unit] = Bind(
//   txn = Get(tvar = io.github.timwspence.cats.stm.STMLike$TVar@6ecbd07a),
//   f = <function1>
// )

val result = stm.commit(txn).attempt.unsafeRunSync()
// result: Either[Throwable, Unit] = Left(
//   value = java.lang.RuntimeException: Balance must be at least 100
// )

Note that aborting a transaction will not modify any of the TVars involved.