@hackage supernova0.0.1

Apache Pulsar client for Haskell

supernova

CI Status

⚠️ it is still very much under development 🚧 so use it at your own risk ⚠️

A supernova is a powerful and luminous stellar explosion. This transient astronomical event occurs during the last evolutionary stages of a massive star or when a white dwarf is triggered into runaway nuclear fusion. The original object, called the progenitor, either collapses to a neutron star or black hole, or is completely destroyed. The peak optical luminosity of a supernova can be comparable to that of an entire galaxy before fading over several weeks or months.

supernova

Quick Start

The example located in test/Main.hs showcases a simple consumer and producer running concurrently (needs the async library).

main :: IO ()
main = runPulsar resources $ \(Consumer {..}, Producer {..}) ->
  let c = forever $ fetch >>= \(Message i m) -> msgDecoder m >> ack i
      p = forever $ sleep 5 >> traverse_ produce messages
  in  concurrently_ c p

resources :: Pulsar (Consumer IO, Producer IO)
resources = do
  ctx      <- connect defaultConnectData
  consumer <- newConsumer ctx topic "test-sub"
  producer <- newProducer ctx topic
  return (consumer, producer)

A Message contains a MessageID you need for acking and a payload defined as a lazy ByteString.

Run it with the following command:

cabal new-run supernova-tests

By default, it logs to the standard output in DEBUG level. You can change it by suppling LogOptions.

logOpts :: LogOptions
logOpts = LogOptions Info StdOut

runPulsar' logOpts resources

Streaming

Since both consumers and producers operate on any MonadIO m, we could leverage some streaming libraries. Here's the same example using streamly.

import           Streamly
import qualified Streamly.Prelude              as S

main :: IO ()
main = runPulsar resources $ \(Consumer {..}, Producer {..}) ->
  let c = forever $ fetch >>= \(Message i p) -> msgDecoder p >> ack i
      p = forever $ sleep 5 >> traverse_ produce messages
  in  S.drain . asyncly . maxThreads 10 $ S.yieldM c <> S.yieldM p

Development

It is recommended to use Cachix to reduce the compilation time.

nix-build

Or within a Nix shell (run nix-shell at the project's root).

cabal new-build