2023-07-29

Building a resilient RSS feed unifier with ZIO


Functional effect libraries are an important part of the Scala ecosystem, so I've been playing with them for a while. The static-site generator that produces this site, as well as the templating engine that underlies it are both built with ZIO.

But in both those projects, I've quietly wondered whether whatever benefit I get from working through functional effects outweighs the extra ceremony and abstraction it requires. One likes to keep up with the times, sure, but is the new thing progress or merely fashion?

I now spread my writing over several blogs and microblogs. A few days ago, I decided to write a service to unify the sites' several RSS feeds into a convenient single feed for all of my writing. (Actually two feeds: all-blogs.rss and all-blogs-and-microblogs.rss) The service needs periodically to load source feeds from each of the several sites, then merge them into a single RSS document and stand ready to serve them to subscribers.

The project is unify-rss on github.

It's a pretty trivial application, but it does need to be fault tolerant. It oughtn't break if, for example, one of the source feeds go down.

Error-handling / retry logic needn't complicated, but in my experience it often becomes verbose. It can render core "happy path" functionality less clear and readable, and make applications more complicated to maintain. But ZIO really shines at concisely "transforming effects", so that retry logic mostly stays out of the way. This was the first project where I was sure the extra indirection of functional effects was "worth it".

Simplifying, the basic logic of the application looks like this:

import scala.collection.*
import scala.xml.{Elem, XML}
import java.net.URL
import zio.*

// this is much simpler than the real AppConfig!
case class AppConfig( sourceUrls : immutable.Seq[URL], refreshSeconds : Int )

def fetchFeed(url : URL) : Task[Elem] =
  ZIO.attemptBlocking(XML.load(url))

def fetchFeeds(urls : Iterable[URL]) : Task[immutable.Seq[Elem]] =
  ZIO.collectAllPar(Chunk.fromIterable(urls.map(fetchFeed)))

def mergeFeeds( config : AppConfig, elems : immutable.Seq[Elem] ) : Task[immutable.Seq[Byte]] = ???

def initMergeFeed( config : AppConfig ) : Task[Ref[immutable.Seq[Byte]]]  =
  for
    elems <- fetchFeeds( config.sourceUrls )
    feed  <- mergeFeeds( config, elems )
    ref   <- Ref.make(feed)
  yield ref

def updateMergedFeed( config : AppConfig, ref : Ref[immutable.Seq[Byte]] ) : Task[Unit] =
  for
    elems <- fetchFeeds( config.sourceUrls )
    feed  <- mergeFeeds( config, elems )
    _     <- ref.set(feed)
  yield ()

def periodicallyUpdateMergedFeed( config : AppConfig, ref : Ref[immutable.Seq[Byte]] ) : Task[Long] =
  updateMergedFeed( config, ref ).schedule( Schedule.fixed( Duration.fromSeconds(config.refreshSeconds) ) )

// use tapir withh http-zio to create an effect starting a web endpoint that serves RSS from the ref
def server(ac : AppConfig, ref : Ref[immutable.Seq[Byte]] ) : UIO[ExitCode] = ???

object Main extends ZIOAppDefault:
  val config : AppConfig = ???

  override def run =
    for
      ref      <- initMergeFeed( config )
      _        <- periodicallyUpdateMergedFeed( config, ref ).forkDaemon
      exitCode <- server( config, ref )
    yield exitCode

It's super simple. We set up a thread-safe (well, um, fiber-safe) zio.Ref to hold the initial merged feed, then we fork a separate fiber to run an effect that periodically updates the Ref, then we start up an http service that serves the contents of the ref.

The super concise refresh loop logic was definitely a ZIO advantage.

updateMergedFeed( config, ref ).schedule( Schedule.fixed( Duration.fromSeconds(config.refreshSeconds) ) )

But we are not resilient yet! What if updateMergedFeed(...) fails? For now the effect would just end, and our server would break. An RSS feed that never updates is worse than no RSS feed at all.

But, with ZIO it is trivial to solve this problem. We just define a retry Schedule, and have the update attempt retry until it hopefully, eventually, succeeds.

def retrySchedule( config : AppConfig ) =
  Schedule.exponential( 10.seconds, 1.5d ) || Schedule.fixed( Duration.fromSeconds( config.refreshSeconds ) ) 

def periodicallyUpdateMergedFeed( config : AppConfig, ref : Ref[immutable.Seq[Byte]] ) : Task[Long] =
  val resilient = updateMergedFeed( config, ref ).schedule( retrySchedule( config ) )
  resilient.schedule( Schedule.fixed( Duration.fromSeconds(config.refreshSeconds) ) )

This retry schedule will make a first retry attempt after 10 seconds, then after 15 seconds, then after (1.52 * 10 =) 22.5 seconds, etc. — retrying less and less often but no less often than the configured refresh frequency for the unified feed.

Cool! Now our application won't break if any part of an update attempt fails, but will sensibly retry until it succeeds.

But.

What if it never does succeed? What if just one of the several source feeds getting merged goes fairly permanently down? Should that prevent the merged feed from updating indefinitely?

I decided that it should not, and that instead if a single feed is unavailable, we should just omit its items and merge the rest. So, we revise...

def fetchFeed(url : URL) : Task[Option[Elem]] =
  ZIO.attemptBlocking(XML.load(url))
    .foldCauseZIO(
      cause => ZIO.logCause(s"Problem loading feed '${url}'", cause) *> ZIO.succeed(None),
      elem => ZIO.succeed(Some(elem))
    )

def fetchFeeds(urls : Iterable[URL]) : Task[immutable.Seq[Elem]] =
  ZIO.collectAllPar(Chunk.fromIterable(urls.map(fetchFeed)))
    .map( _.collect { case Some(elem) => elem } )

Now we recover from an individual feed-fetch failure, and return an Option, Some(elem) on success, None otherwise. We then collect the successes and build our merged feed from those.

But maybe we should retry our update attempts for a source feed before returning None, in case there's a very transient hitch?

def fetchFeed(url : URL) : Task[Option[Elem]] =
  ZIO.attemptBlocking(XML.load(url))
    .retry( Schedule.spaced( 6.seconds ).upTo( 60.seconds ) )
    .foldCauseZIO(
      cause => ZIO.logCause(s"Problem loading feed '${url}'", cause) *> ZIO.succeed(None),
      elem => ZIO.succeed(Some(elem))
    )

Great! Now we'll try a source feed for up to a minute before giving up on it.

The actual application is more complicated than this, mostly because instead of serving just a single merged feed, it permits you to configure any number of feeds to merge from different groups of source feeds. So, for example, where we have a Ref in the logic above, unify-rss has a Map of feed paths to Ref. The logic has to update all the Refs, rather than just a single one, periodically.

But that's just book-keeping. At its core, this was a pretty simple app. With ZIO the logic could be expressed safely and concisely, and it was easy to evolve the app from an initial, brittle sketch to one that should be resilient as a permanent service.


Note: I recently went through Daniel Ciocîrlan's ZIO 2 video course, which was very, very helpful for this project. Highly recommended!


Update 2023-11-10: I've added to this app functionality to generate feeds as static files, rather than to serve them from memory as a (resilient!) daemon. That saves some memory and overhead on the server side. Instead of periodically updating feeds in the app, now we just define a systemd timer to rerun a static-file generating version of the app every half hour. It's a less sexy app somehow, this way, but a bit simpler and conserves memory.

We still do the retries described above, when feeds fail to load. But we let systemd take care of the periodic regeneratios.

See the current README for more information.