At first sight, RxJS is blown up lodash but for dealing also with async. In reality, it’s so much more than that. With a few simple operators, you can implement a Redux-like state machine, schedule animation or deal with any type of events no matter whether it’s WebSocket message or filling in the text input.

Angular is armed with RxJS out of the box, but how about its great competitor? I like React so much not only for what it is but also for that what you can make of it. While preparing for a presentation about Reactive JavaScript at meet.js Summit I have been asked couple times how to integrate RxJS into an existing app. All in all, there is a small chance that you are allowed to rewrite the app in Cycle.js.

Update I’ve decided to do a full rewrite. From now on createState supports scoped reducers and few helpers were added. What’s more, state for connect is passed in Provider context like in Redux.

Reactive state

Redux does the job and it’s way simpler than its precursor. In real live, an app written with Flux has many stores, but it leads to unnecessary complexity and managing state dependencies. Redux with single store gained popularity really quickly. How about no store at all? With reactive state we don’t need passive store. We can replace createStore with createState. Observable state will update components calling setState, or better, with props.

// createState

function createState(reducer$, initialState$ = Rx.Observable.of({})) {
  return initialState$
    .merge(reducer$)
    .scan((state, [scope, reducer]) =>
      ({ ...state, [scope]: reducer(state[scope]) }))
    .publishReplay(1)
    .refCount();
}

Thanks to calling publishReplay, refCount we’ll share the same state among all observers. In RxJS 4 you could achieve the same with shareReplay.

test("createState creates reactive state using scoped reducers", (t) => {
  const add$ = new Rx.Subject();
  const counterReducer$ = add$.map(payload => state => state + payload);
  const rootReducer$ = counterReducer$.map(counter => ["counter", counter]);
  const state$ = createState(rootReducer$, Rx.Observable.of({ counter: 10 }));

  t.plan(1);

  add$.next(1); // No subscribers yet

  state$.toArray().subscribe((results) => {
    t.deepEqual(results, [{ counter: 10 }, { counter: 12 }]);
  });

  add$.next(2);
  add$.complete();
});

Actions, ActionCreators, Constants…

How about just actions? Subjects actually. Subject is both at the same time, observable and observer.

// counterActions

import { createActions } from "../state/RxState";
export default createActions(["increment", "decrement", "reset"]);

// which is the same as

export default {
  increment: new Rx.Subject,
  decrement: new Rx.Subject,
  reset: new Rx.Subject,
};

Normally it wouldn’t be the best choice to drop the idea of this separation. We can take advantage of consuming observables and using simple map operator and later change an action info a reducer. This eliminates the need for constants and many tests I don’t like the idea of testing basically syntax.

Reducer($)

This part actually is going to be significantly different from what you already know about reducers. In Redux, reducers are pure functions which return new state if they can handle particular actions.

// CounterReducer

import Rx from "rxjs";
import counterActions from "../actions/counterActions";

const initialState = 0;

const CounterReducer$ = Rx.Observable.of(() => initialState)
  .merge(
    counterActions.increment.map(payload => state => state + payload),
    counterActions.decrement.map(payload => state => state - payload),
    counterActions.reset.map(_payload => _state => initialState),
  );

export default CounterReducer$;

Reducers, CounterReducer$ actually, is a stream of lazy-evaluated, pure functions. We can take data carried by an action and change it into single, pure function.

import test from "ava";
import { pipe } from "ramda";
import counterActions from "../actions/counterActions";
import CounterReducer$ from "./CounterReducer";

test("handles increment, decrement and reset actions", (t) => {
  CounterReducer$.take(5).toArray().subscribe((fns) => {
    t.is(pipe(...fns)(), 9);
  });

  counterActions.increment.next(1);
  counterActions.reset.next();
  counterActions.increment.next(10);
  counterActions.decrement.next(1);
});

As you can see testing such reducer is really simple with little ramda help.

connect

Firstly, we don’t want to tight coupling components with any modules. Components props can be treated as a simple, but still powerful, dependency injection. The question is how to extract action creators (action subject in our case) so the implementation details won’t leak.

Redux already solved this problem with connect from react-redux. Our implementation will be simpler but yet fully functional for our reactive use case.

// connect

export function connect(selector = state => state, actionSubjects) {
  const actions = Object.keys(actionSubjects)
    .reduce((akk, key) => ({ ...akk, [key]: value => actionSubjects[key].next(value) }), {});

  return function wrapWithConnect(WrappedComponent) {
    return class Connect extends Component {
      static contextTypes = {
        state$: PropTypes.object.isRequired,
      };

      componentWillMount() {
        this.subscription = this.context.state$.map(selector).subscribe(::this.setState);
      }

      componentWillUnmount() {
        this.subscription.unsubscribe();
      }

      render() {
        return (
          <WrappedComponent {...this.state} {...this.props} {...actions} />
        );
      }
    };
  };
}

Connect component is wrapping component to pass state to props. selector is a deadly simple function to map global state to component props. actionSubjects is an object of subjects returned from createActions which hides the boilerplate of calling .next(), we don’t have to call dispatch.

test("connect maps state to props in Provider context", (t) => {
  const add$ = new Rx.Subject();
  const counterReducer$ = add$.map(payload => state => state + payload);
  const rootReducer$ = counterReducer$.map(counter => ["counter", counter]);
  const state$ = createState(rootReducer$, Rx.Observable.of({ counter: 10 }));

  const Counter = ({ counter, add }) => (
    <div>
      <h1>{counter}</h1>
      <button onClick={() => add(1)}>add</button>
    </div>
  );

  const ConnectedCounter = connect(state => ({ counter: state.counter }), { add: add$ })(Counter);

  const tree = mount(
    <Provider state$={state$}>
      <ConnectedCounter />
    </Provider>
  );

  t.is(tree.find("h1").text(), "10");
  tree.find("button").simulate("click");
  t.is(tree.find("h1").text(), "11");
});

Reactive components

import React from "react";
import PropTypes from "prop-types";
import { connect } from "../state/RxState";
import counterActions from "../actions/counterActions";

export const Counter = ({ counter, increment, decrement, reset }) => (
  <div>
    <h1>{counter}</h1>
    <hr />
    <button onClick={() => increment(1)} id="increment">+</button>
    <button onClick={() => increment(10)} id="increment10">+10</button>
    <button onClick={reset} id="reset">Reset</button>
    <button onClick={() => decrement(1)} id="decrement">-</button>
    <button onClick={() => decrement(10)} id="decrement10">-10</button>
  </div>
);

Counter.propTypes = {
  counter: PropTypes.number.isRequired,
  increment: PropTypes.func.isRequired,
  decrement: PropTypes.func.isRequired,
  reset: PropTypes.func.isRequired,
};

export default connect(({ counter }) => ({ counter }), counterActions)(Counter);

We wrapped Counter in higher-order components. The main advantage is that it then can be a stateless component which is a pure function. Pure functions are highly reusable, easy to test and maintain.

import React from "react";
import test from "ava";
import sinon from "sinon";
import { shallow } from "enzyme";
import { Counter } from "./Counter";

test("displays counter", (t) => {
  const increment = sinon.spy();
  const decrement = sinon.spy();
  const reset = sinon.spy();
  const tree = shallow(
    <Counter
      counter={123}
      increment={increment}
      decrement={decrement}
      reset={reset}
    />
  );
  t.is(tree.find("h1").text(), "123");
});

test("calls passed actions", (t) => {
  const increment = sinon.spy();
  const decrement = sinon.spy();
  const reset = sinon.spy();
  const tree = shallow(
    <Counter
      counter={123}
      increment={increment}
      decrement={decrement}
      reset={reset}
    />
  );
  tree.find("#increment").simulate("click");
  t.true(increment.calledWith(1));
  tree.find("#increment10").simulate("click");
  t.true(increment.calledWith(10));
  tree.find("#reset").simulate("click");
  t.true(reset.called);
  tree.find("#decrement").simulate("click");
  t.true(decrement.calledWith(1));
  tree.find("#decrement10").simulate("click");
  t.true(decrement.calledWith(10));
});

Async

How to handle AJAX calls? According to good practices from Flux or Redux the initial call to the server starts in the action creator. What do we do when we don’t have action creators? We can init the request in the reducer. While it doesn’t sound great keep in mind that we are dealing with one stream and whether an operation is async or not is irrelevant.

UserActions.fetch$.flatMap(userId => {
  return Rx.Observable.ajax(`/users/${userId}`)
});

Observables have many advantages over Promises and one of the greatest is the ability to retry.

UserActions.fetch$.concatMap(userId => {
  return Rx.Observable.ajax(`/users/${userId}`)
    .retryWhen(err$ => err$.delay(1000).take(10));
});

Summary

We have only scratched the surface of possible RxJS use cases in React. With even basic knowledge about RxJS observables and operators, you can do pretty awesome stuff in just couple lines of code.

GitHub: MichalZalecki/connect-rxjs-to-react