Welcome!

Blog Feed Post

RX: Reactive Libraries

Over the last several years, the term “reactive programming” became popular in many programming languages. Reactive Manifesto was published although it gives a rather generic definition of reactive systems http://www.reactivemanifesto.org. Yes, an app should response fast (Responsive), remain functional in case of errors (Resilient), be flexible in regards to increasing/decreasing computational resources (Elastic), and be based on asynchronous events (Message Driven).

Declaring the principles is a good start, but how to apply these principles in a concrete app? Let’a talk about reactive extensions libraries available for many programming languages.

About seven years ago, Erik Meijer from Microsoft created Reactive Extensions (Rx) – a set of libraries Rx.NET for processing asynchronous event-driven data streams. For example, someone posted a tweet and you received an immediate notification.

A non-reactive way of receiving tweets would be visiting twitter.com every now and then and reload the page hoping that one of the people you follow posted a new tweet – this is known as polling. The load on the server will substantially increase if every user will keep polling the server. The push model is a lot more efficient – just subscribe to the tweeter feed and get the tweets asynchronously pushed to you as they become available.

An online auction is another use case for the async data push. A user bids on the product, but other users may overbid her. Bids should be implemented as a stream that allows subscription so the users won’t need to constantly check if their offers are still the winning ones.

Another example is a stream on stock prices during the working ours of a stock exchange. Or take a stream of signals from a sensor (e.g. an accelerometer in your phone). Even the process of dragging the mouse over the screen can be treated as a stream of coordinates of the mouse pointer.

Five years ago, Microsoft released Rx.NET as an open source project. People liked it and the library was ported to other programming languages: RxCpp, RxJS, RxPHP, Rx.rb, Rx.py, RxJava, RxSwift, RxScala, RxKotlin.

Disclaimer. This post is not a Rx tutorial, but a brief introduction of the main Rx players. I work with RxJS and RxJava, but in this post, I’ll be using code samples in JavaScript

Let’s get familiar with the main concepts of Rx libraries, but first consider this non-reactive code:

int a1 = 2;

int b1 = 4;



int c1 = a1 + b1;  // c1 = 6

    

a1 = 55;           // c1 = 6, but should be 59    
b1 = 20;           // c1 = 6, but should be 75

After the execution of this code, c1 is still equal to 6. Sure enough, we could add more code to recalculate c1 after the values of both a1 and b1 changed, but a more proper way to handle this is by making c1 to be immediately recalculated as soon as either a1 or b1 change as in Excel spreadsheet. In other words, it would be nice to switch to the push model, where the new and asynchronously changed values are pushed to their consumer. We want to move away from the pull model, where the consumer is constantly asking the producer, “Do you have something new for me?… How about now?… Maybe now you have something?”

Observable, Observer, Subscriber

The main players of any Rx library are Observable, Observer, and Subscriber.

* Observable – an object or a function that emits sequences of data over time (a.k.a. producer)

* Observer – an object or a function that knows how to process the sequences of data (a.k.a. consumer)

* Subscriber – an object or a function that connects an Observable with Observer(s)

https://yakovfain.files.wordpress.com/2017/04/push.png?w=1518&h=606 1518w, https://yakovfain.files.wordpress.com/2017/04/push.png?w=150&h=60 150w, https://yakovfain.files.wordpress.com/2017/04/push.png?w=300&h=120 300w, https://yakovfain.files.wordpress.com/2017/04/push.png?w=768&h=307 768w, https://yakovfain.files.wordpress.com/2017/04/push.png?w=1024&h=409 1024w" sizes="(max-width: 760px) 100vw, 760px" />

After looking at this diagram, many software developers will say, “We already know this. This is a pub-sub messaging with the implementation of the Observer pattern.” To some extent, this is correct, but there’s more to it:

1. Rx is meant for the asynchronous non-blocking data processing.

2. Rx offers a simple API with dedicated channels for sending data, errors, and the end-of-stream signal.

3. Any Rx library has about a hundred operators that can be applied to the data stream en route. Operators are easily composable.

4. Some of the Rx implementations (e.g. RxJava2) support the backpressure well. This is a scenario when a producer emits data faster than a consumer can handle.

5. You don’t need special messaging servers to use a Rx library. Everything you need is a part of your app.

6. In languages that support multi-threading, working with threads as well as switching between the threads is easier.Android developers will appreciate this because the UI rendering has to be done in the main thread while the calculation in others.

So how an Observable sends the data to the Observer? An Observer can implement three methods (their names may slightly vary depending on the language you use):

* next() – here’s a new value from the stream
* error() – here’s an error happened in the stream
* complete() – the stream’s over

In the next code sample, the function getData() turns the array with beers into an Observable and returns it back. Returns to whom? To the subscriber, when some other code invokes subscribe(). A subscriber – getData().subscribe(myObserver) – passes an Observer, as an argument to the function subscribe(). Accordingly, an Observer can implement three functions:

* Handling the next element from the stream
* Handling the stream error
* Handling the end of stream, if needed

// Defining the function that returns an Observable
function getData(){

    var beers = [
        {name: "Stella", country: "Belgium", price: 9.50},
        {name: "Sam Adams", country: "USA", price: 8.50},
        {name: "Bud Light", country: "USA", price: 6.50},
        {name: "Brooklyn Lager", country: "USA", price: 8.00},
        {name: "Sapporo", country: "Japan", price: 7.50}
    ];

// The observer will be provided at the time of subscription
    return Rx.Observable.create( observer => {

              beers.forEach( beer => observer.next(beer));
              observer.complete();
           }
    );
}

// Calling the function that subscribe to the observable
// The function subscribe() receives the Observer, represented by three functions
getData()
     .subscribe(
         beer  => console.log("Subscriber got " + beer),   // handling the arrived data
         error => console.err(error),                      // an error arrived
            () > console.log("The stream is over")         // the signal that the stream completed arrived
);

Our Observer consists of three fat arrow functions (=>). This syntax was introduced in the ECMAScript 6 spec. Our fat arrow callbacks may be invoked only after we invoked subscribe(). You can see this code sample in action here (open the browser’s console and click on the Run button).

Operators

Operators are functions that can transform the stream data between the moments when the Observable sent them and the function subscribe() received them. In other words, we can transform the data en route. Rx libraries have lots of operators.

https://yakovfain.files.wordpress.com/2017/04/operators.png?w=1520&h=948 1520w, https://yakovfain.files.wordpress.com/2017/04/operators.png?w=150&h=94 150w, https://yakovfain.files.wordpress.com/2017/04/operators.png?w=300&h=187 300w, https://yakovfain.files.wordpress.com/2017/04/operators.png?w=768&h=479 768w, https://yakovfain.files.wordpress.com/2017/04/operators.png?w=1024&h=639 1024w" sizes="(max-width: 760px) 100vw, 760px" />

Each operator is a function that takes an Observable as an argument, transforms (or ignores) it, and returns another Observable. Since the input and output of any operator have the same type, you can chain them up. Here’s how you can filter out the beer that’s more expensive than 8 dollars and convert the instances of the Beer object into strings.

https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=1518&h=552 1518w, https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=150&h=55 150w, https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=300&h=109 300w, https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=768&h=279 768w, https://yakovfain.files.wordpress.com/2017/04/filter_map.png?w=1024&h=372 1024w" sizes="(max-width: 760px) 100vw, 760px" />

Studying Rx operators require a time investment, and I’m planning to write more about them. The RX docs often include marble diagrams that help in understanding what a particular operator does. As an example, the marble diagram for the filter operator looks as follows:

https://yakovfain.files.wordpress.com/2017/04/marble.png?w=150&h=70 150w, https://yakovfain.files.wordpress.com/2017/04/marble.png?w=300&h=139 300w, https://yakovfain.files.wordpress.com/2017/04/marble.png?w=768&h=357 768w, https://yakovfain.files.wordpress.com/2017/04/marble.png?w=1024&h=476 1024w, https://yakovfain.files.wordpress.com/2017/04/marble.png 1368w" sizes="(max-width: 760px) 100vw, 760px" />

On top, the incoming stream (an Observable) is represented by various geometrical shapes. Then the filter operator ignores every element but circles, and the resulting Observables will contain only the circles. When you look at the circle, visualize think beers that are cheaper than eight dollars.

Still, how to make c1=a1+b1 reactive?

First, convert a1 and b1 into streams, for example:

const a1 = Rx.Observable.from([2, 55]);

But this stream will shoot 2 and 55 instantaneously, so let’s add the time dimension. To emulate a delay you can use another stream that just emits sequential numbers and join it using the zip operator with the stream that emits 2 and 55:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);

When someone subscribes to a1, it’ll emit 2 and in 1.2 seconds 55. Similarly, let’s create a stream for b1 but with a delay of 1.5 seconds. Then, using streams composition and the operator combineLatest, we combine streams a1 and b1 and add their latest values. The entire code will look as follows:

const a1 = Rx.Observable.from([2, 55])
  .zip(Rx.Observable.interval(1200), x => x);
  
const b1 = Rx.Observable.from([4, 20])
  .zip(Rx.Observable.interval(1500), x => x);

a1.combineLatest(b1, (x, y) => x + y)
  .subscribe(val => console.log("c1=" + val));

To see this code in action, visit the Plunker at http://bit.ly/2nphn0k, open the browser’s console and click on the button Run. you’ll see how c1 is recalculated as soon as either a1 or b1 is changing.

https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png?w=150&h=124 150w, https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png?w=300&h=247 300w, https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png?w=768&h=633 768w, https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png?w=1024&h=844 1024w, https://yakovfain.files.wordpress.com/2017/04/a1b1c1.png 1218w" sizes="(max-width: 760px) 100vw, 760px" />

If you haven’t worked with reactive libraries, take a look at the one available for your programming language and start using it in a real-world project.

A word of caution. Rx libraries allow you to write less code, but the code readability suffer. The person who reads the code needs to know Rx as well.

On the positive side, Rx libraries don’t require you to change the architecture of the entire project. Use them where you can make the async data to flow through a sequence of algorithms (think operators).

https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg?w=112&h=150 112w, https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg?w=224&h=300 224w, https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg?w=768&h=1031 768w, https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg?w=763&h=1024 763w, https://yakovfain.files.wordpress.com/2017/04/img_5417-1.jpg 1216w" sizes="(max-width: 760px) 100vw, 760px" />


Read the original blog entry...

More Stories By Yakov Fain

Yakov Fain is a Java Champion and a co-founder of the IT consultancy Farata Systems and the product company SuranceBay. He wrote a thousand blogs (http://yakovfain.com) and several books about software development. Yakov authored and co-authored such books as "Angular 2 Development with TypeScript", "Java 24-Hour Trainer", and "Enterprise Web Development". His Twitter tag is @yfain

Latest Stories
DX World EXPO, LLC, a Lighthouse Point, Florida-based startup trade show producer and the creator of "DXWorldEXPO® - Digital Transformation Conference & Expo" has announced its executive management team. The team is headed by Levent Selamoglu, who has been named CEO. "Now is the time for a truly global DX event, to bring together the leading minds from the technology world in a conversation about Digital Transformation," he said in making the announcement.
"Space Monkey by Vivent Smart Home is a product that is a distributed cloud-based edge storage network. Vivent Smart Home, our parent company, is a smart home provider that places a lot of hard drives across homes in North America," explained JT Olds, Director of Engineering, and Brandon Crowfeather, Product Manager, at Vivint Smart Home, in this SYS-CON.tv interview at @ThingsExpo, held Oct 31 – Nov 2, 2017, at the Santa Clara Convention Center in Santa Clara, CA.
SYS-CON Events announced today that Conference Guru has been named “Media Sponsor” of the 22nd International Cloud Expo, which will take place on June 5-7, 2018, at the Javits Center in New York, NY. A valuable conference experience generates new contacts, sales leads, potential strategic partners and potential investors; helps gather competitive intelligence and even provides inspiration for new products and services. Conference Guru works with conference organizers to pass great deals to gre...
DevOps is under attack because developers don’t want to mess with infrastructure. They will happily own their code into production, but want to use platforms instead of raw automation. That’s changing the landscape that we understand as DevOps with both architecture concepts (CloudNative) and process redefinition (SRE). Rob Hirschfeld’s recent work in Kubernetes operations has led to the conclusion that containers and related platforms have changed the way we should be thinking about DevOps and...
The Internet of Things will challenge the status quo of how IT and development organizations operate. Or will it? Certainly the fog layer of IoT requires special insights about data ontology, security and transactional integrity. But the developmental challenges are the same: People, Process and Platform. In his session at @ThingsExpo, Craig Sproule, CEO of Metavine, demonstrated how to move beyond today's coding paradigm and shared the must-have mindsets for removing complexity from the develop...
In his Opening Keynote at 21st Cloud Expo, John Considine, General Manager of IBM Cloud Infrastructure, led attendees through the exciting evolution of the cloud. He looked at this major disruption from the perspective of technology, business models, and what this means for enterprises of all sizes. John Considine is General Manager of Cloud Infrastructure Services at IBM. In that role he is responsible for leading IBM’s public cloud infrastructure including strategy, development, and offering m...
The next XaaS is CICDaaS. Why? Because CICD saves developers a huge amount of time. CD is an especially great option for projects that require multiple and frequent contributions to be integrated. But… securing CICD best practices is an emerging, essential, yet little understood practice for DevOps teams and their Cloud Service Providers. The only way to get CICD to work in a highly secure environment takes collaboration, patience and persistence. Building CICD in the cloud requires rigorous ar...
Companies are harnessing data in ways we once associated with science fiction. Analysts have access to a plethora of visualization and reporting tools, but considering the vast amount of data businesses collect and limitations of CPUs, end users are forced to design their structures and systems with limitations. Until now. As the cloud toolkit to analyze data has evolved, GPUs have stepped in to massively parallel SQL, visualization and machine learning.
"Evatronix provides design services to companies that need to integrate the IoT technology in their products but they don't necessarily have the expertise, knowledge and design team to do so," explained Adam Morawiec, VP of Business Development at Evatronix, in this SYS-CON.tv interview at @ThingsExpo, held Oct 31 – Nov 2, 2017, at the Santa Clara Convention Center in Santa Clara, CA.
To get the most out of their data, successful companies are not focusing on queries and data lakes, they are actively integrating analytics into their operations with a data-first application development approach. Real-time adjustments to improve revenues, reduce costs, or mitigate risk rely on applications that minimize latency on a variety of data sources. In his session at @BigDataExpo, Jack Norris, Senior Vice President, Data and Applications at MapR Technologies, reviewed best practices to ...
Widespread fragmentation is stalling the growth of the IIoT and making it difficult for partners to work together. The number of software platforms, apps, hardware and connectivity standards is creating paralysis among businesses that are afraid of being locked into a solution. EdgeX Foundry is unifying the community around a common IoT edge framework and an ecosystem of interoperable components.
"ZeroStack is a startup in Silicon Valley. We're solving a very interesting problem around bringing public cloud convenience with private cloud control for enterprises and mid-size companies," explained Kamesh Pemmaraju, VP of Product Management at ZeroStack, in this SYS-CON.tv interview at 21st Cloud Expo, held Oct 31 – Nov 2, 2017, at the Santa Clara Convention Center in Santa Clara, CA.
Large industrial manufacturing organizations are adopting the agile principles of cloud software companies. The industrial manufacturing development process has not scaled over time. Now that design CAD teams are geographically distributed, centralizing their work is key. With large multi-gigabyte projects, outdated tools have stifled industrial team agility, time-to-market milestones, and impacted P&L stakeholders.
"Akvelon is a software development company and we also provide consultancy services to folks who are looking to scale or accelerate their engineering roadmaps," explained Jeremiah Mothersell, Marketing Manager at Akvelon, in this SYS-CON.tv interview at 21st Cloud Expo, held Oct 31 – Nov 2, 2017, at the Santa Clara Convention Center in Santa Clara, CA.
Enterprises are adopting Kubernetes to accelerate the development and the delivery of cloud-native applications. However, sharing a Kubernetes cluster between members of the same team can be challenging. And, sharing clusters across multiple teams is even harder. Kubernetes offers several constructs to help implement segmentation and isolation. However, these primitives can be complex to understand and apply. As a result, it’s becoming common for enterprises to end up with several clusters. Thi...