gonzih / cats-nats   0.1.0

Apache License 2.0 GitHub

Cats Effect wrapper for Nats

Scala versions: 3.x

Cats went Nats

CI Release Snapshot
CI Release Artifacts Snapshot Artifacts

Cats Nats

Cats Effect friendly wrapper for Nats

Usage

Add cats-nats to your build.sbt libraryDependencies with

"io.github.gonzih" %% "cats-nats" % "0.1.0"

Then just use it:

import cats.effect.IOApp
import cats.effect.IO
import io.github.gonzih.nats.Nats

object Main extends IOApp.Simple {

  def run: IO[Unit] =
    val payload = "hello world"
    val stream = "my-persistent-stream"
    val subj = "my-subject"
    val durable = "my-durable-id"

    // connect to nats via Cats Effect Resource
    Nats
      .connect("nats://localhost:4222")
      .use({ case nc =>
        for
          // stream creation can be done externally or via cats-nats API
          _ <- nc.addStream(stream, subj)
          // create JetStream instance
          js <- nc.js
          // subscribe to subject on stream, this is backed by unbound Cats Effect Queue
          sub <- js.subscribe(stream, subj, durable, true)
          // publish your message
          _ <- js.publish(subj, payload.getBytes)
          // wait for message to be received
          msg <- sub.take
          // unsubscribe
          _ <- sub.unsubscribe
          // print result
          IO.println(String(msg.getData()))
        yield ()
      })
}

Key Value storage example:

import cats.effect.IOApp
import cats.effect.IO
import io.github.gonzih.nats.Nats

object Main extends IOApp.Simple {

  def run: IO[Unit] =
    val bucket = "kv-bucket"
    val key = "kv-object-key"
    val value = "object contents"

    // connect to nats via Cats Effect Resource
    Nats
      .connect("nats://localhost:4222")
      .use({ case nc =>
        for
          // create bucket
          __<- nc.kvManagement.create(bucket)
          // get bucket KV instance
          kv <- nc.kv(bucket)
          // create new object
          version <- kv.create(key, value.getBytes)
          // read object by key
          ve <- kv.get(key)
          // read content of an object
          v <- ve.value
          // delete object by key
          _ <- kv.delete(key)
          // purge deletes in bucket
          _ <- kv.purgeDeletes
          // delete bucket
          _ <- nc.kvManagement.delete(bucket)
          // print result
          _ <-_IO.println(String(v))
        yield ()
      })
}