Welcome!

Blog Feed Post

RX: Reactive Libraries

Over the last several years, the term “reactive programming” became popular in many programming languages. Reactive Manifesto was published although it gives a rather generic definition of reactive systems http://www.reactivemanifesto.org. Yes, an app should response fast (Responsive), remain functional in case of errors (Resilient), be flexible in regards to increasing/decreasing computational resources (Elastic), and be based on asynchronous events (Message Driven).

Declaring the principles is a good start, but how to apply these principles in a concrete app? Let’a talk about reactive extensions libraries available for many programming languages.

About seven years ago, Erik Meijer from Microsoft created Reactive Extensions (Rx) – a set of libraries Rx.NET for processing asynchronous event-driven data streams. For example, someone posted a tweet and you received an immediate notification.

A non-reactive way of receiving tweets would be visiting twitter.com every now and then and reload the page hoping that one of the people you follow posted a new tweet – this is known as polling. The load on the server will substantially increase if every user will keep polling the server. The push model is a lot more efficient – just subscribe to the tweeter feed and get the tweets asynchronously pushed to you as they become available.

An online auction is another use case for the async data push. A user bids on the product, but other users may overbid her. Bids should be implemented as a stream that allows subscription so the users won’t need to constantly check if their offers are still the winning ones.

Another example is a stream on stock prices during the working ours of a stock exchange. Or take a stream of signals from a sensor (e.g. an accelerometer in your phone). Even the process of dragging the mouse over the screen can be treated as a stream of coordinates of the mouse pointer.

Five years ago, Microsoft released Rx.NET as an open source project. People liked it and the library was ported to other programming languages: RxCpp, RxJS, RxPHP, Rx.rb, Rx.py, RxJava, RxSwift, RxScala, RxKotlin.

Disclaimer. This post is not a Rx tutorial, but a brief introduction of the main Rx players. I work with RxJS and RxJava, but in this post, I’ll be using code samples in JavaScript

Let’s get familiar with the main concepts of Rx libraries, but first consider this non-reactive code:

int a1 = 2;

int b1 = 4;



int c1 = a1 + b1;  // c1 = 6

    

a1 = 55;           // c1 = 6, but should be 59    
b1 = 20;           // c1 = 6, but should be 75

After the execution of this code, c1 is still equal to 6. Sure enough, we could add more code to recalculate c1 after the values of both a1 and b1 changed, but a more proper way to handle this is by making c1 to be immediately recalculated as soon as either a1 or b1 change as in Excel spreadsheet. In other words, it would be nice to switch to the push model, where the new and asynchronously changed values are pushed to their consumer. We want to move away from the pull model, where the consumer is constantly asking the producer, “Do you have something new for me?… How about now?… Maybe now you have something?”

Observable, Observer, Subscriber

The main players of any Rx library are Observable, Observer, and Subscriber.

* Observable – an object or a function that emits sequences of data over time (a.k.a. producer)

* Observer – an object or a function that knows how to process the sequences of data (a.k.a. consumer)

* Subscriber – an object or a function that connects an Observable with Observer(s)

https://yakovfain.files.wordpress.com/2017/04/push.png?w=1518&h=606 1518w, https://yakovfain.files.wordpress.com/2017/04/push.png?w=150&h=60 150w, https://yakovfain.files.wordpress.com/2017/04/push.png?w=300&h=120 300w, https://yakovfain.files.wordpress.com/2017/04/push.png?w=768&h=307 768w, https://yakovfain.files.wordpress.com/2017/04/push.png?w=1024&h=409 1024w" sizes="(max-width: 760px) 100vw, 760px" />

After looking at this diagram, many software developers will say, “We already know this. This is a pub-sub messaging with the implementation of the Observer pattern.” To some extent, this is correct, but there’s more to it:

1. Rx is meant for the asynchronous non-blocking data processing.

2. Rx offers a simple API with dedicated channels for sending data, errors, and the end-of-stream signal.

3. Any Rx library has about a hundred operators that can be applied to the data stream en route. Operators are easily composable.

4. Some of the Rx implementations (e.g. RxJava2) support the backpressure well. This is a scenario when a producer emits data faster than a consumer can handle.

5. You don’t need special messaging servers to use a Rx library. Everything you need is a part of your app.

6. In languages that support multi-threading, working with threads as well as switching between the threads is easier.Android developers will appreciate this because the UI rendering has to be done in the main thread while the calculation in others.

So how an Observable sends the data to the Observer? An Observer can implement three methods (their names may slightly vary depending on the language you use):

* next() – here’s a new value from the stream
* error() – here’s an error happened in the stream
* complete() – the stream’s over

In the next code sample, the function getData() turns the array with beers into an Observable and returns it back. Returns to whom? To the subscriber, when some other code invokes subscribe(). A subscriber – getData().subscribe(myObserver) – passes an Observer, as an argument to the function subscribe(). Accordingly, an Observer can implement three functions:

* Handling the next element from the stream
* Handling the stream error
* Handling the end of stream, if needed

// Defining the function that returns an Observable
function getData(){

    var beers = [
        {name: "Stella", country: "Belgium", price: 9.50},
        {name: "Sam Adams", country: "USA", price: 8.50},
        {name: "Bud Light", country: "USA", price: 6.50},
        {name: "Brooklyn Lager", country: "USA", price: 8.00},
        {name: "Sapporo", country: "Japan", price: 7.50}
    ];

// The observer will be provided at the time of subscription
    return Rx.Observable.create( observer => {

              beers.forEach( beer => observer.next(beer));
              observer.complete();
           }
    );
}

// Calling the function that subscribe to the observable
// The function subscribe() receives the Observer, represented by three functions
getData()
     .subscribe(
         beer  => console.log("Subscriber got " + beer),   // handling the arrived data
         error => console.err(error),                      // an error arrived
            () > console.log("The stream is over")         // the signal that the stream completed arrived
);

Our Observer consists of three fat arrow functions (=>). This syntax was introduced in the ECMAScript 6 spec. Our fat arrow callbacks may be invoked only after we invoked subscribe(). You can see this code sample in action here (open the browser’s console and click on the Run button).

Operators

Operators are functions that can transform the stream data between the moments when the Observable sent them and the function subscribe() received them. In other words, we can transform the data en route. Rx libraries have lots of operators.

https://yakovfain.files.wordpress.com/2017/04/operators.png?w=1520&h=948 1520w, https://yakovfain.files.wordpress.com/2017/04/operators.png?w=150&h=94 150w, https://yakovfain.files.wordpress.com/2017/04/operators.png?w=300&h=187 300w, https://yakovfain.files.wordpress.com/2017/04/operators.png?w=768&h=479 768w, https://yakovfain.files.wordpress.com/2017/04/operators.png?w=1024&h=639 1024w" sizes="(max-width: 760px) 100vw, 760px" />

Each operator is a function that takes an Observable as an argument, transforms (or ignores) it, and returns another Observable. Since the input and output of any operator have the same type, you can chain them up. Here’s how you can filter out the beer that’s more expensive than 8 dollars and convert the instances of the Beer object into strings.

https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=1518&h=552 1518w, https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=150&h=55 150w, https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=300&h=109 300w, https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=768&h=279 768w, https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=1024&h=372 1024w" sizes="(max-width: 760px) 100vw, 760px" />

Studying Rx operators require a time investment, and I’m planning to write more about them. The RX docs often include marble diagrams that help in understanding what a particular operator does. As an example, the marble diagram for the filter operator looks as follows:

https://yakovfain.files.wordpress.com/2017/04/marble.png?w=150&h=70 150w, https://yakovfain.files.wordpress.com/2017/04/marble.png?w=300&h=139 300w, https://yakovfain.files.wordpress.com/2017/04/marble.png?w=768&h=357 768w, https://yakovfain.files.wordpress.com/2017/04/marble.png?w=1024&h=476 1024w, https://yakovfain.files.wordpress.com/2017/04/marble.png 1368w" sizes="(max-width: 760px) 100vw, 760px" />

On top, the incoming stream (an Observable) is represented by various geometrical shapes. Then the filter operator ignores every element but circles, and the resulting Observables will contain only the circles. When you look at the circle, visualize think beers that are cheaper than eight dollars.

Still, how to make c1=a1+b1 reactive?

First, convert a1 and b1 into streams, for example:

const a1 = Rx.Observable.from([2, 55]);

But this stream will shoot 2 and 55 instantaneously, so let’s add the time dimension. To emulate a delay you can use another stream that just emits sequential numbers and join it using the zip operator with the stream that emits 2 and 55:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);

When someone subscribes to a1, it’ll emit 2 and in 1.2 seconds 55. Similarly, let’s create a stream for b1 but with a delay of 1.5 seconds. Then, using streams composition and the operator combineLatest, we combine streams a1 and b1 and add their latest values. The entire code will look as follows:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);
  
const b1 = Rx.Observable.from([4, 20])
  .zip(Rx.Observable.interval(1500), x => x);

a1.combineLatest(b1, (x, y) => x + y)
  .subscribe(val => console.log("c1=" + val));

To see this code in action, visit the Plunker at http://bit.ly/2nphn0k, open the browser’s console and click on the button Run. you’ll see how c1 is recalculated as soon as either a1 or b1 is changing.

https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png?w=150&h=124 150w, https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png?w=300&h=247 300w, https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png?w=768&h=633 768w, https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png?w=1024&h=844 1024w, https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png 1218w" sizes="(max-width: 760px) 100vw, 760px" />

If you haven’t worked with reactive libraries, take a look at the one available for your programming language and start using it in a real-world project.

A word of caution. Rx libraries allow you to write less code, but the code readability suffer. The person who reads the code needs to know Rx as well.

On the positive side, Rx libraries don’t require you to change the architecture of the entire project. Use them where you can make the async data to flow through a sequence of algorithms (think operators).

https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg?w=112&h=150 112w, https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg?w=224&h=300 224w, https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg?w=768&h=1031 768w, https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg?w=763&h=1024 763w, https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg 1216w" sizes="(max-width: 760px) 100vw, 760px" />


Read the original blog entry...

More Stories By Yakov Fain

Yakov Fain is a Java Champion and a co-founder of the IT consultancy Farata Systems and the product company SuranceBay. He wrote a thousand blogs (http://yakovfain.com) and several books about software development. Yakov authored and co-authored such books as "Angular 2 Development with TypeScript", "Java 24-Hour Trainer", and "Enterprise Web Development". His Twitter tag is @yfain

Latest Stories
Internet of @ThingsExpo, taking place October 31 - November 2, 2017, at the Santa Clara Convention Center in Santa Clara, CA, is co-located with 21st Cloud Expo and will feature technical sessions from a rock star conference faculty and the leading industry players in the world. The Internet of Things (IoT) is the most profound change in personal and enterprise IT since the creation of the Worldwide Web more than 20 years ago. All major researchers estimate there will be tens of billions devic...
"The Striim platform is a full end-to-end streaming integration and analytics platform that is middleware that covers a lot of different use cases," explained Steve Wilkes, Founder and CTO at Striim, in this SYS-CON.tv interview at 20th Cloud Expo, held June 6-8, 2017, at the Javits Center in New York City, NY.
"With Digital Experience Monitoring what used to be a simple visit to a web page has exploded into app on phones, data from social media feeds, competitive benchmarking - these are all components that are only available because of some type of digital asset," explained Leo Vasiliou, Director of Web Performance Engineering at Catchpoint Systems, in this SYS-CON.tv interview at DevOps Summit at 20th Cloud Expo, held June 6-8, 2017, at the Javits Center in New York City, NY.
21st International Cloud Expo, taking place October 31 - November 2, 2017, at the Santa Clara Convention Center in Santa Clara, CA, will feature technical sessions from a rock star conference faculty and the leading industry players in the world. Cloud computing is now being embraced by a majority of enterprises of all sizes. Yesterday's debate about public vs. private has transformed into the reality of hybrid cloud: a recent survey shows that 74% of enterprises have a hybrid cloud strategy. Me...
SYS-CON Events announced today that Datera, that offers a radically new data management architecture, has been named "Exhibitor" of SYS-CON's 21st International Cloud Expo ®, which will take place on Oct 31 - Nov 2, 2017, at the Santa Clara Convention Center in Santa Clara, CA. Datera is transforming the traditional datacenter model through modern cloud simplicity. The technology industry is at another major inflection point. The rise of mobile, the Internet of Things, data storage and Big...
SYS-CON Events announced today that DXWorldExpo has been named “Global Sponsor” of SYS-CON's 21st International Cloud Expo, which will take place on Oct 31 – Nov 2, 2017, at the Santa Clara Convention Center in Santa Clara, CA. Digital Transformation is the key issue driving the global enterprise IT business. Digital Transformation is most prominent among Global 2000 enterprises and government institutions.
Kubernetes is an open source system for automating deployment, scaling, and management of containerized applications. Kubernetes was originally built by Google, leveraging years of experience with managing container workloads, and is now a Cloud Native Compute Foundation (CNCF) project. Kubernetes has been widely adopted by the community, supported on all major public and private cloud providers, and is gaining rapid adoption in enterprises. However, Kubernetes may seem intimidating and complex ...
"Outscale was founded in 2010, is based in France, is a strategic partner to Dassault Systémes and has done quite a bit of work with divisions of Dassault," explained Jackie Funk, Digital Marketing exec at Outscale, in this SYS-CON.tv interview at 20th Cloud Expo, held June 6-8, 2017, at the Javits Center in New York City, NY.
"We focus on SAP workloads because they are among the most powerful but somewhat challenging workloads out there to take into public cloud," explained Swen Conrad, CEO of Ocean9, Inc., in this SYS-CON.tv interview at 20th Cloud Expo, held June 6-8, 2017, at the Javits Center in New York City, NY.
"I think DevOps is now a rambunctious teenager – it’s starting to get a mind of its own, wanting to get its own things but it still needs some adult supervision," explained Thomas Hooker, VP of marketing at CollabNet, in this SYS-CON.tv interview at DevOps Summit at 20th Cloud Expo, held June 6-8, 2017, at the Javits Center in New York City, NY.
"We are still a relatively small software house and we are focusing on certain industries like FinTech, med tech, energy and utilities. We help our customers with their digital transformation," noted Piotr Stawinski, Founder and CEO of EARP Integration, in this SYS-CON.tv interview at 20th Cloud Expo, held June 6-8, 2017, at the Javits Center in New York City, NY.
"We've been engaging with a lot of customers including Panasonic, we've been involved with Cisco and now we're working with the U.S. government - the Department of Homeland Security," explained Peter Jung, Chief Product Officer at Pulzze Systems, in this SYS-CON.tv interview at @ThingsExpo, held June 6-8, 2017, at the Javits Center in New York City, NY.
"We're here to tell the world about our cloud-scale infrastructure that we have at Juniper combined with the world-class security that we put into the cloud," explained Lisa Guess, VP of Systems Engineering at Juniper Networks, in this SYS-CON.tv interview at 20th Cloud Expo, held June 6-8, 2017, at the Javits Center in New York City, NY.
Your homes and cars can be automated and self-serviced. Why can't your storage? From simply asking questions to analyze and troubleshoot your infrastructure, to provisioning storage with snapshots, recovery and replication, your wildest sci-fi dream has come true. In his session at @DevOpsSummit at 20th Cloud Expo, Dan Florea, Director of Product Management at Tintri, provided a ChatOps demo where you can talk to your storage and manage it from anywhere, through Slack and similar services with...
As enterprise cloud becomes the norm, businesses and government programs must address compounded regulatory compliance related to data privacy and information protection. The most recent, Controlled Unclassified Information and the EU’s GDPR have board level implications and companies still struggle with demonstrating due diligence. Developers and DevOps leaders, as part of the pre-planning process and the associated supply chain, could benefit from updating their code libraries and design by in...