Welcome!

Blog Feed Post

RX: Reactive Libraries

Over the last several years, the term “reactive programming” became popular in many programming languages. Reactive Manifesto was published although it gives a rather generic definition of reactive systems http://www.reactivemanifesto.org. Yes, an app should response fast (Responsive), remain functional in case of errors (Resilient), be flexible in regards to increasing/decreasing computational resources (Elastic), and be based on asynchronous events (Message Driven).

Declaring the principles is a good start, but how to apply these principles in a concrete app? Let’a talk about reactive extensions libraries available for many programming languages.

About seven years ago, Erik Meijer from Microsoft created Reactive Extensions (Rx) – a set of libraries Rx.NET for processing asynchronous event-driven data streams. For example, someone posted a tweet and you received an immediate notification.

A non-reactive way of receiving tweets would be visiting twitter.com every now and then and reload the page hoping that one of the people you follow posted a new tweet – this is known as polling. The load on the server will substantially increase if every user will keep polling the server. The push model is a lot more efficient – just subscribe to the tweeter feed and get the tweets asynchronously pushed to you as they become available.

An online auction is another use case for the async data push. A user bids on the product, but other users may overbid her. Bids should be implemented as a stream that allows subscription so the users won’t need to constantly check if their offers are still the winning ones.

Another example is a stream on stock prices during the working ours of a stock exchange. Or take a stream of signals from a sensor (e.g. an accelerometer in your phone). Even the process of dragging the mouse over the screen can be treated as a stream of coordinates of the mouse pointer.

Five years ago, Microsoft released Rx.NET as an open source project. People liked it and the library was ported to other programming languages: RxCpp, RxJS, RxPHP, Rx.rb, Rx.py, RxJava, RxSwift, RxScala, RxKotlin.

Disclaimer. This post is not a Rx tutorial, but a brief introduction of the main Rx players. I work with RxJS and RxJava, but in this post, I’ll be using code samples in JavaScript

Let’s get familiar with the main concepts of Rx libraries, but first consider this non-reactive code:

int a1 = 2;

int b1 = 4;



int c1 = a1 + b1;  // c1 = 6

    

a1 = 55;           // c1 = 6, but should be 59    
b1 = 20;           // c1 = 6, but should be 75

After the execution of this code, c1 is still equal to 6. Sure enough, we could add more code to recalculate c1 after the values of both a1 and b1 changed, but a more proper way to handle this is by making c1 to be immediately recalculated as soon as either a1 or b1 change as in Excel spreadsheet. In other words, it would be nice to switch to the push model, where the new and asynchronously changed values are pushed to their consumer. We want to move away from the pull model, where the consumer is constantly asking the producer, “Do you have something new for me?… How about now?… Maybe now you have something?”

Observable, Observer, Subscriber

The main players of any Rx library are Observable, Observer, and Subscriber.

* Observable – an object or a function that emits sequences of data over time (a.k.a. producer)

* Observer – an object or a function that knows how to process the sequences of data (a.k.a. consumer)

* Subscriber – an object or a function that connects an Observable with Observer(s)

https://yakovfain.files.wordpress.com/2017/04/push.png?w=1518&h=606 1518w, https://yakovfain.files.wordpress.com/2017/04/push.png?w=150&h=60 150w, https://yakovfain.files.wordpress.com/2017/04/push.png?w=300&h=120 300w, https://yakovfain.files.wordpress.com/2017/04/push.png?w=768&h=307 768w, https://yakovfain.files.wordpress.com/2017/04/push.png?w=1024&h=409 1024w" sizes="(max-width: 760px) 100vw, 760px" />

After looking at this diagram, many software developers will say, “We already know this. This is a pub-sub messaging with the implementation of the Observer pattern.” To some extent, this is correct, but there’s more to it:

1. Rx is meant for the asynchronous non-blocking data processing.

2. Rx offers a simple API with dedicated channels for sending data, errors, and the end-of-stream signal.

3. Any Rx library has about a hundred operators that can be applied to the data stream en route. Operators are easily composable.

4. Some of the Rx implementations (e.g. RxJava2) support the backpressure well. This is a scenario when a producer emits data faster than a consumer can handle.

5. You don’t need special messaging servers to use a Rx library. Everything you need is a part of your app.

6. In languages that support multi-threading, working with threads as well as switching between the threads is easier.Android developers will appreciate this because the UI rendering has to be done in the main thread while the calculation in others.

So how an Observable sends the data to the Observer? An Observer can implement three methods (their names may slightly vary depending on the language you use):

* next() – here’s a new value from the stream
* error() – here’s an error happened in the stream
* complete() – the stream’s over

In the next code sample, the function getData() turns the array with beers into an Observable and returns it back. Returns to whom? To the subscriber, when some other code invokes subscribe(). A subscriber – getData().subscribe(myObserver) – passes an Observer, as an argument to the function subscribe(). Accordingly, an Observer can implement three functions:

* Handling the next element from the stream
* Handling the stream error
* Handling the end of stream, if needed

// Defining the function that returns an Observable
function getData(){

    var beers = [
        {name: "Stella", country: "Belgium", price: 9.50},
        {name: "Sam Adams", country: "USA", price: 8.50},
        {name: "Bud Light", country: "USA", price: 6.50},
        {name: "Brooklyn Lager", country: "USA", price: 8.00},
        {name: "Sapporo", country: "Japan", price: 7.50}
    ];

// The observer will be provided at the time of subscription
    return Rx.Observable.create( observer => {

              beers.forEach( beer => observer.next(beer));
              observer.complete();
           }
    );
}

// Calling the function that subscribe to the observable
// The function subscribe() receives the Observer, represented by three functions
getData()
     .subscribe(
         beer  => console.log("Subscriber got " + beer),   // handling the arrived data
         error => console.err(error),                      // an error arrived
            () > console.log("The stream is over")         // the signal that the stream completed arrived
);

Our Observer consists of three fat arrow functions (=>). This syntax was introduced in the ECMAScript 6 spec. Our fat arrow callbacks may be invoked only after we invoked subscribe(). You can see this code sample in action here (open the browser’s console and click on the Run button).

Operators

Operators are functions that can transform the stream data between the moments when the Observable sent them and the function subscribe() received them. In other words, we can transform the data en route. Rx libraries have lots of operators.

https://yakovfain.files.wordpress.com/2017/04/operators.png?w=1520&h=948 1520w, https://yakovfain.files.wordpress.com/2017/04/operators.png?w=150&h=94 150w, https://yakovfain.files.wordpress.com/2017/04/operators.png?w=300&h=187 300w, https://yakovfain.files.wordpress.com/2017/04/operators.png?w=768&h=479 768w, https://yakovfain.files.wordpress.com/2017/04/operators.png?w=1024&h=639 1024w" sizes="(max-width: 760px) 100vw, 760px" />

Each operator is a function that takes an Observable as an argument, transforms (or ignores) it, and returns another Observable. Since the input and output of any operator have the same type, you can chain them up. Here’s how you can filter out the beer that’s more expensive than 8 dollars and convert the instances of the Beer object into strings.

https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=1518&h=552 1518w, https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=150&h=55 150w, https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=300&h=109 300w, https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=768&h=279 768w, https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=1024&h=372 1024w" sizes="(max-width: 760px) 100vw, 760px" />

Studying Rx operators require a time investment, and I’m planning to write more about them. The RX docs often include marble diagrams that help in understanding what a particular operator does. As an example, the marble diagram for the filter operator looks as follows:

https://yakovfain.files.wordpress.com/2017/04/marble.png?w=150&h=70 150w, https://yakovfain.files.wordpress.com/2017/04/marble.png?w=300&h=139 300w, https://yakovfain.files.wordpress.com/2017/04/marble.png?w=768&h=357 768w, https://yakovfain.files.wordpress.com/2017/04/marble.png?w=1024&h=476 1024w, https://yakovfain.files.wordpress.com/2017/04/marble.png 1368w" sizes="(max-width: 760px) 100vw, 760px" />

On top, the incoming stream (an Observable) is represented by various geometrical shapes. Then the filter operator ignores every element but circles, and the resulting Observables will contain only the circles. When you look at the circle, visualize think beers that are cheaper than eight dollars.

Still, how to make c1=a1+b1 reactive?

First, convert a1 and b1 into streams, for example:

const a1 = Rx.Observable.from([2, 55]);

But this stream will shoot 2 and 55 instantaneously, so let’s add the time dimension. To emulate a delay you can use another stream that just emits sequential numbers and join it using the zip operator with the stream that emits 2 and 55:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);

When someone subscribes to a1, it’ll emit 2 and in 1.2 seconds 55. Similarly, let’s create a stream for b1 but with a delay of 1.5 seconds. Then, using streams composition and the operator combineLatest, we combine streams a1 and b1 and add their latest values. The entire code will look as follows:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);
  
const b1 = Rx.Observable.from([4, 20])
  .zip(Rx.Observable.interval(1500), x => x);

a1.combineLatest(b1, (x, y) => x + y)
  .subscribe(val => console.log("c1=" + val));

To see this code in action, visit the Plunker at http://bit.ly/2nphn0k, open the browser’s console and click on the button Run. you’ll see how c1 is recalculated as soon as either a1 or b1 is changing.

https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png?w=150&h=124 150w, https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png?w=300&h=247 300w, https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png?w=768&h=633 768w, https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png?w=1024&h=844 1024w, https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png 1218w" sizes="(max-width: 760px) 100vw, 760px" />

If you haven’t worked with reactive libraries, take a look at the one available for your programming language and start using it in a real-world project.

A word of caution. Rx libraries allow you to write less code, but the code readability suffer. The person who reads the code needs to know Rx as well.

On the positive side, Rx libraries don’t require you to change the architecture of the entire project. Use them where you can make the async data to flow through a sequence of algorithms (think operators).

https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg?w=112&h=150 112w, https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg?w=224&h=300 224w, https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg?w=768&h=1031 768w, https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg?w=763&h=1024 763w, https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg 1216w" sizes="(max-width: 760px) 100vw, 760px" />


Read the original blog entry...

More Stories By Yakov Fain

Yakov Fain is a Java Champion and a co-founder of the IT consultancy Farata Systems and the product company SuranceBay. He wrote a thousand blogs (http://yakovfain.com) and several books about software development. Yakov authored and co-authored such books as "Angular 2 Development with TypeScript", "Java 24-Hour Trainer", and "Enterprise Web Development". His Twitter tag is @yfain

Latest Stories
DevOps is often described as a combination of technology and culture. Without both, DevOps isn't complete. However, applying the culture to outdated technology is a recipe for disaster; as response times grow and connections between teams are delayed by technology, the culture will die. A Nutanix Enterprise Cloud has many benefits that provide the needed base for a true DevOps paradigm. In his Day 3 Keynote at 20th Cloud Expo, Chris Brown, a Solutions Marketing Manager at Nutanix, will explore t...
Translating agile methodology into real-world best practices within the modern software factory has driven widespread DevOps adoption, yet much work remains to expand workflows and tooling across the enterprise. As models evolve from pockets of experimentation into wholescale organizational reinvention, practitioners find themselves challenged to incorporate the culture and architecture necessary to support DevOps at scale. In his session at @DevOpsSummit at 20th Cloud Expo, Anand Akela, Senior...
Join IBM November 2 at 19th Cloud Expo at the Santa Clara Convention Center in Santa Clara, CA, and learn how to go beyond multi-speed it to bring agility to traditional enterprise applications. Technology innovation is the driving force behind modern business and enterprises must respond by increasing the speed and efficiency of software delivery. The challenge is that existing enterprise applications are expensive to develop and difficult to modernize. This often results in what Gartner calls ...
In recent years, containers have taken the world by storm. Companies of all sizes and industries have realized the massive benefits of containers, such as unprecedented mobility, higher hardware utilization, and increased flexibility and agility; however, many containers today are non-persistent. Containers without persistence miss out on many benefits, and in many cases simply pass the responsibility of persistence onto other infrastructure, adding additional complexity.
Did you know that you can develop for mainframes in Java? Or that the testing and deployment can be automated across mobile to mainframe? In his session at @DevOpsSummit at 20th Cloud Expo, Vaughn Marshall, Sr. Principal Product Owner at CA Technologies, will discuss and demo how increasingly teams are developing with agile methodologies using modern development environments and automating testing and deployments, mobile to mainframe.
SYS-CON Events announced today that Hitachi Data Systems, a wholly owned subsidiary of Hitachi LTD., will exhibit at SYS-CON's 20th International Cloud Expo®, which will take place on June 6-8, 2017, at the Javits Center in New York City. Hitachi Data Systems (HDS) will be featuring the Hitachi Content Platform (HCP) portfolio. This is the industry’s only offering that allows organizations to bring together object storage, file sync and share, cloud storage gateways, and sophisticated search an...
With major technology companies and startups seriously embracing IoT strategies, now is the perfect time to attend @ThingsExpo 2016 in New York. Learn what is going on, contribute to the discussions, and ensure that your enterprise is as "IoT-Ready" as it can be! Internet of @ThingsExpo, taking place June 6-8, 2017, at the Javits Center in New York City, New York, is co-located with 20th Cloud Expo and will feature technical sessions from a rock star conference faculty and the leading industry p...
Automation is enabling enterprises to design, deploy, and manage more complex, hybrid cloud environments. Yet the people who manage these environments must be trained in and understanding these environments better than ever before. A new era of analytics and cognitive computing is adding intelligence, but also more complexity, to these cloud environments. How smart is your cloud? How smart should it be? In this power panel at 20th Cloud Expo, moderated by Conference Chair Roger Strukhoff, pane...
Most companies are adopting or evaluating container technology - Docker in particular - to speed up application deployment, drive down cost, ease management and make application delivery more flexible overall. As with most new architectures, this dream takes a lot of work to become a reality. Even when you do get your application componentized enough and packaged properly, there are still challenges for DevOps teams to making the shift to continuous delivery and achieving that reduction in cost ...
@GonzalezCarmen has been ranked the Number One Influencer and @ThingsExpo has been named the Number One Brand in the “M2M 2016: Top 100 Influencers and Brands” by Analytic. Onalytica analyzed tweets over the last 6 months mentioning the keywords M2M OR “Machine to Machine.” They then identified the top 100 most influential brands and individuals leading the discussion on Twitter.
SYS-CON Events announced today that Twistlock, the leading provider of cloud container security solutions, will exhibit at SYS-CON's 20th International Cloud Expo®, which will take place on June 6-8, 2017, at the Javits Center in New York City, NY. Twistlock is the industry's first enterprise security suite for container security. Twistlock's technology addresses risks on the host and within the application of the container, enabling enterprises to consistently enforce security policies, monitor...
The Internet of Things is clearly many things: data collection and analytics, wearables, Smart Grids and Smart Cities, the Industrial Internet, and more. Cool platforms like Arduino, Raspberry Pi, Intel's Galileo and Edison, and a diverse world of sensors are making the IoT a great toy box for developers in all these areas. In this Power Panel at @ThingsExpo, moderated by Conference Chair Roger Strukhoff, panelists discussed what things are the most important, which will have the most profound e...
The age of Digital Disruption is evolving into the next era – Digital Cohesion, an age in which applications securely self-assemble and deliver predictive services that continuously adapt to user behavior. Information from devices, sensors and applications around us will drive services seamlessly across mobile and fixed devices/infrastructure. This evolution is happening now in software defined services and secure networking. Four key drivers – Performance, Economics, Interoperability and Trust ...
@ThingsExpo has been named the Most Influential ‘Smart Cities - IIoT' Account and @BigDataExpo has been named fourteenth by Right Relevance (RR), which provides curated information and intelligence on approximately 50,000 topics. In addition, Right Relevance provides an Insights offering that combines the above Topics and Influencers information with real time conversations to provide actionable intelligence with visualizations to enable decision making. The Insights service is applicable to eve...
Multiple data types are pouring into IoT deployments. Data is coming in small packages as well as enormous files and data streams of many sizes. Widespread use of mobile devices adds to the total. In this power panel at @ThingsExpo, moderated by Conference Chair Roger Strukhoff, panelists will look at the tools and environments that are being put to use in IoT deployments, as well as the team skills a modern enterprise IT shop needs to keep things running, get a handle on all this data, and deli...