@hackage streaming0.1.0.17

a free monad transformer optimized for streaming applications

The free stream on a streamable functor

Stream can be used wherever FreeT is used. The compiler's standard range of optimizations work better for operations written in terms of Stream. FreeT f m r / Stream f m r is of course extremely general, and many functor-general combinators are exported by the general module Streaming.

Streaming.Prelude is focused on elementary streaming applications. Here the free iteration of the 'base' functors (readings of the f in Stream f m r) express forms of effectful sequence or succession. Some of types in question appear in the streaming IO libraries under titles like

pipes:      Producer a m r, Producer a m (Producer a m r), FreeT (Producer a m) m r
io-streams: InputStream a, Generator a r
conduit:    Source m a, ConduitM () o m r

Streaming.Prelude closely follows Pipes.Prelude, but cleverly omits the pipes:

ghci> S.stdoutLn $ S.take 2 S.stdinLn
let's<Enter>
let's
stream<Enter>
stream

And here we do a little connect and resume, as the streaming-io experts call it:

ghci> rest <- S.print $ S.splitAt 3 $ S.each [1..10]
1
2
3
ghci> S.sum rest
49

Somehow, we didn't even need a four-character operator for that, nor advice about best practices! - just ordinary Haskell common sense.

Didn't I hear that free monads are horrible?

If Stream f m r is instantiated to Stream f Identity m r then we have the standard free monad construction. This is subject to certain familiar objections from an efficiency perspective; efforts have been made to substitute exotic cps-ed implementations and so forth.

In fact, the standard fast talk about retraversing binds and quadratic explosions and costly appends, and so on become transparent nonsense with Stream f m r in its streaming use. The insight needed to see this is basically nil: Where m is read as IO, or some transformed IO, then the dreaded retraversing of the binds in a stream expression would involve repeating all the past actions. Don't worry, to get e.g. the second chunk of bytes from a handle, you won't need to start over and get the first one again! The first chunk has vanished into an unrepeatable past.

All of the difficulties a streaming library is attempting to avoid are concentrated in the deep irrationality of

sequence :: (Monad m, Traversable t) => t (m a) -> m (t a)

In the streaming context, this becomes

sequence :: Monad m, Functor f => Stream f m r -> Stream f m r
sequence = id

It is of course easy enough to define

accumulate :: Monad m, Functor f => Stream f m r -> m (Stream f Identity r)

or reifyBinds, as you might call it. Small experience will teach the user how to avoid or control the sort of accumulation characteristic of sequence in its various guises e.g. mapM f = sequence . map f and traverse f = sequence . fmap f and replicateM n = sequence . replicate n. See for example the types of

Control.Monad.replicateM :: Int -> m a -> m [a]
Streaming.Prelude.replicateM :: Int -> m a -> Stream (Of a) m ()

If you want to tempt fate and replicate the irrationality of Control.Monad.replicateM, then sure, you can write the hermaphroditic, chimerical

accumulate . Streaming.Prelude.replicateM :: Int -> m a -> m (Stream (Of a) Identity ())

but once you know how to operate with a stream directly you will see less and less point in what is called extracting the (structured) value from IO. With sequence and traverse, we accumulate a structure holding pure values from a structure holding monadic values. Why bother when you have intrinsically monadic structures? Stream f m r gives you an immense body of such structures and a simple discipline for working with them.

Interoperation with the streaming-io libraries

The simplest form of interoperation with pipes is accomplished with this isomorphism:

Pipes.unfoldr Streaming.next        :: Stream (Of a) m r   -> Producer a m r
Streaming.unfoldr Pipes.next        :: Producer a m r      -> Stream (Of a) m r

(streaming can be mixed with pipes wherever pipes itself employs Control.Monad.Trans.Free; speedups are frequently appreciable.) Interoperation with io-streams is thus:

Streaming.reread IOStreams.read     :: InputStream a       -> Stream (Of a) IO ()
IOStreams.unfoldM Streaming.uncons  :: Stream (Of a) IO () -> IO (InputStream a)

A simple exit to conduit would be, e.g.:

Conduit.unfoldM Streaming.uncons    :: Stream (Of a) m ()  -> Source m a

These conversions should never be more expensive than a single >-> or =$=.

At a much more general level, we also of course have interoperation with free:

Free.iterTM  Stream.wrap              :: FreeT f m a -> Stream f m a
Stream.iterTM Free.wrap               :: Stream f m a -> FreeT f m a

Examples

For some simple ghci examples, see the commentary throughout the Prelude module. For slightly more advanced usage see the commentary in the haddocks of streaming-bytestring and e.g. these replicas of shell-like programs from the io-streams tutorial. Here's a simple streaming GET request with intrinsically streaming byte streams.

Problems

Questions about this library can be put as issues through the github site or on the pipes mailing list. (This library understands itself as part of the pipes "ecosystem.")