Java编程网

分享 Java Web 开发相关知识

使用基于React和RxJS的方法(带有redux-observable和websockets)处理财务数据。

本示例需要具备React和Redux配置的基础知识-如果您不熟悉这些概念,可以在此处查看一些示例。

我们将使用Finnhub API(免费访问,“用于股票,货币和加密货币的RESTful API”),以构建RxJS连接的基于反应的解决方案,该解决方案将提供一些实时财务数据。

我们还将使用Redux-observable,这是将React应用程序与RxJS magic连接的绝佳解决方案。Redux-observable使用所有RxJS功能作为Redux的中间件,Redux是JS应用程序的状态容器,主要与React一起使用。

RxJS和Redux-observale

RxJS是将ReactiveX模式引入JavaScript的库。ReactiveX模式也广泛用于不同的平台/语言,例如RxPY,RxJava,RX.rb …您可以在此处阅读有关所有平台的更多信息。
RxJS本身具有相当陡峭的学习曲线,但是它具有许多低级运算符,可帮助处理所有副作用和异步数据操作。

已经有一篇文章介绍了有关可观察性和RxJS的基础知识:

另一方面,Redux-observable是一个库,它允许将RxJS用作Redux的中间件,Rex是状态管理容器,广泛用于React-因此,基本上,RxJS可以在React中最需要的地方使用应用程序:它处理保留在应用程序状态内的所有异步数据。

基本设定

我们将构建一个简单的应用程序,该应用程序使用websocket与RXJS的连接来处理实时数据(并显示股票代号的当前值)。

首先,我们需要创建包含基本目录actionsreducers目录的基本react + redux项目结构,并使用react-redux连接存储<Provider>

import { Provider } from "react-redux";

function App() {
  return (
    <Provider store={store}>
      <div className="App">
         <Repositories />
      </div>
    </Provider>
  );
}

请检查App.js上面示例中的文件。

创建基本项目结构后,我们需要连接可观看redux的中间件:

import { createStore, applyMiddleware } from "redux";
import { createEpicMiddleware, combineEpics } from "redux-observable";

const epicMiddleware = createEpicMiddleware();
const store = createStore(
  reducer,
  { isLoading: false, isError: false, repositories: [] },
  applyMiddleware(epicMiddleware)
);

export const rootEpic = combineEpics(actions.stockDataEpic);

rootEpic对象是与rootReducerredux中的对象相似的概念。为了创建它,我们需要使用combineEpics函数,就像combineReducers在redux中使用函数一样。

下一步是运行我们的中间件:

epicMiddleware.run(rootEpic);

现在,我们可以转到actions/index.js包含动作创建者的文件:

export function openStockStream(ticker) {
  return {
    type: START_STREAM,
    ticker
  };
}

export function getDataStop() {
  return {
    type: GET_DATA_CANCEL
  };
}

export function getDataDone(data) {
  return {
    type: GET_DATA_DONE,
    payload: {
 // data retuned in this format from Finnhub websocket
      [data.data[0].s]: data.data[0]
    }
  };
}

export function getDataFailed(error) {
  return {
    type: GET_DATA_FAILED,
    payload: error
  };
}

创建史诗

一个史诗是核心原始Redux的可观测的。

该函数需要一系列操作并返回一系列操作。采取行动,采取行动

终极版,可观察到的文档

这是Redux-observable提供的基本功能,它使我们可以连接RxJS observable并在action函数中对其进行处理。它需要一个动作流(请注意,它是按照惯例用$符号编写的:action$-这种表示法告诉我们这是流本身)。

import { ofType } from 'redux-observable';

...

export const stockDataEpic = action$ => {
  return action$.pipe(
    ofType(START_STREAM),

运算符ofType允许我们过滤要在此处使用的确切操作类型。

下一步是配置套接字连接:

import { webSocket } from "rxjs/webSocket";

const FINNHUBKEY = //Finnhub key;
const socket = webSocket(`wss://ws.finnhub.io?token=${FINNHUBKEY}`);

所以现在Epic可以处理websocket连接了,看起来像这样:

export const stockDataEpic = action$ => {
  return action$.pipe(
    ofType(START_STREAM),
    mergeMap(action =>
      socket
        .multiplex(
          () => ({ type: "subscribe", symbol: action.ticker }),
          () => ({ type: "unsubscribe", symbol: action.ticker }),
          msg => msg.type === "trade" && msg.data[0].s === action.ticker
        )
        .pipe(
          map(response => getDataDone(response)),
          catchError(error => {
            console.log("err:", error);
            return of(getDataFailed("Connection error!"));
          }),
          takeUntil(action$.pipe(ofType(GET_DATA_CANCEL)))
        )
    )
  );
};

Multiplex是一个特定的运算符,用于WebSocketSubject模拟打开多个套接字连接,而实际上仅维护一个。它包含三个参数:

  1. 订阅消息功能
  2. 退订消息功能
  3. 过滤器功能-如果将true消息沿着流传递,否则将被跳过

有关信息的更多信息RxJS Websockets

另一件事是pipe运营商,可以帮助理清不同运营商的使用,同时解决了反应-在这里我们使用的只是基本三:mapcatchErrortakeUntil
https://rxjs-dev.firebaseapp.com/guide/operators

map运算符处理的动作由reducer调度和处理:

import * as actions from '../actions';

export const reducer = (state, action) => {
  switch (action.type) {
    case actions.GET_DATA_REQUESTED:
      return { ...state, isLoading: true};
    case actions.GET_DATA_DONE:
      return { ...state, isLoading: false, repositories: { ...state.repositories, ...action.payload }, isCancelled: false };
action.payload.response, isCancelled: false };
    case actions.GET_DATA_FAILED:
      return { ...state, isLoading: false, isError: true, error: action.payload }
    case actions.GET_DATA_CANCEL:
      return { ...state, isLoading: false, isCancelled: true }
    default:
      return state;
  }
};

订阅股票代码

就是这样了!现在,回到App.js文件中,在组件的打开生命周期方法中,我们可以使用动态Epic一次订阅多个符号:

componentDidMount() {
    this.openBasicStocks();
  }

  openBasicStocks = () => {
    const { openStockStream } = this.props;
    ["AAPL", "NFLX", "FB", "AMZN", "GOOGL", "BINANCE:BTCUSDT"].forEach(s =>
      openStockStream(s)
    );
  };

这是websocket和redux-observable的基本设置。在这个级别上,它不是很复杂,但是使用更复杂的RxJS运算符处理更复杂的数据可能需要更多的时间来学习。尽管如此,RxJS是一个很棒的工具,并且redux-observable允许我们在React应用程序中使用所有功能。
请注意,有些方法并非直接来自redux-observablepackage,而需要直接从导入rxjs,因此我们的package.json文件需要包含两个库:

  "dependencies": {
    ...
    "redux-observable": "^1.2.0",
    "rxjs": "^6.5.3"
  },

如果我在这里可以改善任何信息,请随时与我联系。
编码愉快!

点赞

发表评论

电子邮件地址不会被公开。 必填项已用*标注