Reactive programming is amazing. Hard to understand when you first see it, it solves problems that are really difficult to code without it. It's available in many languages and very well-known by AngularJS users, because it makes an extensive use of it.

The problem

Let's see what's reactive programming before seeing a "real" use case. The easiest way to define it (for me) is comparing it to an Excel file:

  • The first two columns are independent values
  • Columns C and D depend on A and B
  • Column E depends on column D
  • Note that some columns are subscribed to other columns and that all any column can be subscribed by others.
  • Values in E depend on D, which depends on A and B, but E doesn't know or better, doesn't care about this.
    • You can change the formula for D and E will still work.

Let's imagine a GIS example now. pypros is a software I contributed in some time ago when working at the Catalan Meteorological Service.

Basically, it takes the temperature field, the relative humidity (or dew point temperature), the radar signal and outputs a file where each pixel has a value depending on the type of precipitation. You can see it in action here.

  • With temperature and the Dew point, we get what would happen if there was precipitation in a given point
  • With the previous result and the radar image, we can get the final product
  • The main problem to code this is that the frequencies aren't equal for all the fields, so coding it directly means remembering the latest processed images and so on
  • Complicating the diagram is easy with reactive programming because each step will be watching only the interesting observables

The code

As usual, you can get the code at GitHub.

So the first thing to do is act when some file appears. The file could be a temperature, dew point or radar file. To do it, we'll use the watchdog library

import time
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from observables import source

class MyHandler(FileSystemEventHandler):
    def next_source(self, path):
        source.on_next(path)
    def on_created(self, event):
        self.next_source(event.src_path)
    def on_modified(self, event):
        self.next_source(event.src_path)

if __name__ == "__main__":
    path = './data'
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    finally:
        observer.stop()
        observer.join()

  • The Observer object will call the methods in MyHandler when some change occurs in the path
  • source is the RxPY thing that we'll see next. When something changes, the on_next method is called with the path for the file that changed

Let's see now a first version of the RxPY part

import time
from os.path import basename
from pathlib import Path
import rx
from rx import operators as ops
from rx.subject import Subject

def get_date(x):
    return x.split("-")[1].replace(".tiff","")

def process_potential_ros(x):
    print("CREATING POTENTIAL ROS", './data/pot_pon_'+get_date(x[0])+"-"+get_date(x[1])+".tiff")
    time.sleep(1)
    Path('./data/pot_ros_'+get_date(x[0])+"-"+get_date(x[1])+".tiff").touch()

def process_ros(x):
    print("CREATING ROS FILE", './data/pon_'+get_date(x[0])+"-"+get_date(x[1])+".tiff")
    time.sleep(1)
    Path('./data/ros_'+get_date(x[0])+"-"+get_date(x[1])+".tiff").touch()

source = Subject()

td = source.pipe(ops.filter(lambda text: text.find('td')>=0))
temp = source.pipe(ops.filter(lambda text: text.find('temp')>=0))
radar = source.pipe(ops.filter(lambda text: text.find('radar')>=0))
pot_ros = source.pipe(ops.filter(lambda text: text.find('pot_ros')>=0))

rx.combine_latest(temp, td).subscribe(process_potential_ros)
rx.combine_latest(pot_ros, radar).subscribe(process_ros)

  • The first functions are just mock functions that simulate the creation of the actual files. That's why the time.sleep() is there
  • source is a Subject, that can recieve the on_next method and be subscribed. Basically, whan any file changes, the source will emit its path
  • The next lines are observables subscribed to source. They filter the path so they will only emit when the string appears in the path. To when a temperature file appears, the path is emitted by temp
  • Finally, combine_latest takes two observables and emits an event with the latest values for each one. You can check the diagram here
    • The first one creates the potential RoS file that will trigger the source again, because it creates a file
    • The second one takes the previpusly created file and the radar to create the final image

The output is something similar to:


CREATING POTENTIAL ROS ./data/pot_pon_202101010000-202101010000.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010000.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010006.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010012.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010018.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010024.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010030.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010036.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010042.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010048.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010054.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010100.tiff
CREATING POTENTIAL ROS ./data/pot_pon_202101010100-202101010000.tiff
CREATING POTENTIAL ROS ./data/pot_pon_202101010100-202101010100.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010106.tiff
CREATING ROS FILE ./data/pon_202101010000-202101010106.tiff
CREATING ROS FILE ./data/pon_202101010100-202101010106.tiff 

To do it, I created a file that generates radar and temperature and td files like this:

from pathlib import Path
import time

Path('./data/temp-202101010000.tiff').touch()
time.sleep(1)
Path('./data/radar-202101010000.tiff').touch()
time.sleep(1)
Path('./data/td-202101010000.tiff').touch()
time.sleep(1)
Path('./data/radar-202101010006.tiff').touch()
...

Improving the code by using more RxPY

The previous code has three problems (at least)

  1. The Potential Rain Or Snow is saved in a file. This is not wring but it's an intermediate step and maybe we don't want to save it
  2. There are two Potential Rain Or Snow created at the end. This is because combine_latest will emit twice when eiter Td or Temperature appear. Usually, we only want this to be created if the dates for both files are identical (they should appear more or less at the same time)
  3. time.sleep() will block the process. If the file generation lasts a lot, some events will have to wait. It would be much nicer if this calculation happens in another process and when is finished, emits the event (or saves the file)

To solve the first point, we'll use map, that converts the input value to another thing, as in functional programming:

pot_ros = rx.combine_latest(temp, td).pipe(ops.map(create_pot_data))

Then, when subscribing to the pot_ros observable we could receive the calculated field (calculated in create_pot_data) without having to save

To avoid create_pot_data to be run twice when td and temperature are different, a filter can be used:

pot_ros = rx.combine_latest(temp, td).pipe(ops.filter(lambda values: get_date(values[0])==get_date(values[1])), ops.map(create_pot_data))

This way, only the elements that match the filter function will pass to the map.

Finally, to solve the process blocking, we can add a ops.subscribe_on(thread_pool_scheduler). This will make each subscription to run in a different process. I added a long sleep time to the file creation to show that in the same pipe, everything i in the same process. Probably this can be avoided withflat_map and from_future, but I'm not sure that it's a nice feature in a real case.

import multiprocessing
import time
from os.path import basename
from pathlib import Path

import rx
from rx import operators as ops
from rx.core.typing import Observable
from rx.scheduler import ThreadPoolScheduler
from rx.subject import Subject

thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))

def get_date(x):
    return x.split("-")[1].replace(".tiff","")

def process_ros(x):
    fut = asyncio.create_task(foo())
    def when_finished(<em>fut):
        print("CREATING ROS FILE", x)
        time.sleep(10)
        Path('./data/ros</em>'+x[0]+"-"+get_date(x[1])+".tiff").touch()
    fut.add_done_callback(when_finished)

def create_pot_data(x):
    print("CREATING POT ROS DATA", x)
    time.sleep(1)
    return get_date(x[0])

source = Subject()

td = source.pipe(ops.filter(lambda text: text.find('td')>=0), ops.subscribe_on(thread_pool_scheduler))
temp = source.pipe(ops.filter(lambda text: text.find('temp')>=0), ops.subscribe_on(thread_pool_scheduler))
radar = source.pipe(ops.filter(lambda text: text.find('radar')>=0))

pot_ros = rx.combine_latest(temp, td).pipe( ops.filter(lambda values: get_date(values[0])==get_date(values[1])), ops.map(create_pot_data), ops.subscribe_on(thread_pool_scheduler))
rx.combine_latest(pot_ros, radar).subscribe(process_ros, scheduler=thread_pool_scheduler)

Links