Options
All
  • Public
  • Public/Protected
  • All
Menu

RxJS-Ext

Language Build Status Coveralls npm package npm downloads

What's RxJS-Ext

RxJS Extensions是一个基于RxJS^6.0 提供的扩展编程库,提供了更多的 observable及operator帮助开发者快速编程,特别是数据分析领域。

  • rxjs-ext 核心库
  • rxjs-ext/configuration 配置化工具,可以将核心库/rxjs官方库的observable及operator生成依赖表
  • rxjs-ext/compile 配置编译工具,可以结合依赖表,将yaml配置转化成rxjs程序
  • dmr-cli yaml在线编辑工具,能够将符合rxjs-ext-config的配置结合依赖表生成rxjs程序并预览
  • dmr 根据rxjs-ext-config驱动核心库运作满足各种业务需求的实例工程

Hello RxJS

如果你没有RxJS的编程基础,可以通过以下这个最简单的demo先了解下RxJS的基本知识,如果你已经了解,可以前往Usage

const source = of(1 , 2, 3).pipe(
  map((value) => value * 2),
);

source.subscribe(
  (value) => console.log(value),
  (err) => console.log(err),
  () => console.log("complete"),
);

// 2,  4,  6,  complete

上述是一个处理数据,并在完成后打印complete信息的程序,是一个RxJS程序标准的流程。RxJS有几个主要概念:

  • observable 被观察者,主要提供待处理数据
  • operator 操作符,相当于函数,对数据进行处理
  • subscriber 通常定义三个函数,分别处理单个数据处理、异常捕获、执行完成回调

上述程序中,of(1 , 2, 3)生成observable对象,它有pipe(...operator[])方法,可指定多个操作符,如map处理数据。操作符处理完observable会返回一个新的observable,所以souce也是一个observable。最后observable使用subscribe传入subscriber(这里是匿名的),指定各种回调。

observable.pipe(operator, operator, operator, operator)

RxJS的编程范式简单明确,官方提供了足够多的operator,但是RxJS-Ext提供了更多operator可以更快速实现数据形变。

Usage

假设目前你已经了解RxJS的编程范式,接下来将简单介绍如何通过RxJS-Ext提供的observable、operator快速实现一个日志分析的案例。

Observable

 RxJS-Ext 封装了三个Observable,分别满足以FTP、HTTP、FILE形式获取数据,使用方式如:

const file = "./local.log";
const data0 = fromHttp("http://localhost:8111/api");
const data1 = fromFtp({host: "127.0.0.1", path: "/err", port: 8880});
data0.pipe(shuntFile(file)).subscribe(() => null);

上述例子中通过fromHttp获取数据,并使用shuntFile操作符流式存到了file中。

fromFile(file).subscribe(console.log);

如果需要流式读取这个文件,同样使用fromFile即可

Operator

下面一个例子是获取本地日志,并且简单处理数据并存储的过程


// logFile是一份mginx日志,combined是解析模板
const combined = `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`;

const params = fromFile(logFile).pipe(
  line(),   // 流式数据需要通过line来进行按行\n切分
  deformat(combined),
  // 解析成{remote_addr: string, remote_user: string, ...}
  pluck("request"),
  // 选取request字段的内容,包含http地址
  matchAll(/(\w+)=(\w+)/g),
  // 按照正则匹配键值对字符串: "load=1200", "net=400" ...
  split("="),
  pair(),
  // 切割后,用pair转换为键值对: ["load", "1200"] , ["net", "400"] ...
  selectPair("load", "net", /ts_/g),
  // 筛选一些想要的数据
  csv(),
  shuntFile("./newFile.csv"), // 转换为csv格式并存入文件
);

上述过程中用到的 line、 deformat、 matchAll、 pair等操作符极大提升了数据分析的效率,和更多的操作符参见API文档。

有时候,处理日志的场景会更为复杂,并不是一系列pipe就能满足的,我们可以结合RxJS多播类(Subject)实现,如下面例子,除了获取url里的字段外,还需要获取UA里的一些信息,一起存表。


// logFile是一份mginx日志,combined是解析模板
const logJSON = fromFile(logFile).pipe(
  line(),
  deformat(combined),
  publish(), // 暂存为logJSON
);

const params = logJSON.pipe(
  pluck("request"),
  matchAll(/(\w+)=(\w+)/g),
  split("="),
  pair(),
);
const ua =  logJSON.pipe(
  pluck("http_user_agent"),
  uaInfo("sys", "borswer", "device"),
  // ["sys", "ios"], ["borswer", "safari"], ["device", "Huawei"]
);

// 将params、ua各自处理后的数据流以logJSON的频序merge再存入文件
of(params, ua).pipe(
  mergeBuffer(logJSON),
  csv(),
  shuntFile("./newFile.csv"),
)

至此,一个更为复杂的数据处理过程完成。

Rxt-JS 提供了其他运算类操作符,可以完成Map、Reduce的工作,计算指定条件下各字段的均值、80分位、分布等信息,详见高级技巧。

Config

虽然基于rxjs-ext库可以极大提升数据编程效率,但是rxjs-ext的目标不止于此,我们希望可以使用更简洁的方式来编写或生成程序,而rxjs-ext-config是实现这个功能的中间步骤。

如上个实例,可以按照yaml的配置进行编写生成代码

combined: `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`

origin:
  - fromFile: ./nginx.log
  - line:
  - deformat: combined

params:
  - origin:
  - pluck: request
  - matchAll: /(\w+)=(\w+)/g
  - split: =
  - pair:

ua:
  - origin:
  - pluck: http_user_agent
  - uaInfo: [sys, borswer, device]

export:
  - megreBuffer:
    - params:
    - ua:
    - origin:
  - csv:
  - shuntFile: ./newFile.csv"

通过rxjs-ext/compile可以将上述代码编译成TS程序,或者进行语法检查,未来我们会提供其他语言版本(Python、Scalar、C++)的compile工具。 可以看到基于rxjs-ext库可以实现基于语言无关的配置来驱动业务实现,这份配置本身具备良好的阅读体验,配合CLI工具能够满足不同生成角色对数据挖掘的需求。

API

Index

Type aliases

BooleanExpr

BooleanExpr: function

Type declaration

    • (scope: object): boolean
    • Parameters

      • scope: object
        • [index: string]: any

      Returns boolean

BooleanFunction

BooleanFunction: string

ConfigItem

ConfigItem: any | any[] | string | number | Expression | Config

ExprLike

ExprLike: string

Expression

Expression: expression.BooleanExpr | expression.StringExpr | expression.NumberExpr

NumberExpr

NumberExpr: function

Type declaration

    • (scope: object): number
    • Parameters

      • scope: object
        • [index: string]: any

      Returns number

OptionItem

OptionItem: any | string | number | boolean | Option | OptionItems

OptionItems

OptionItems: any[] | string[] | number[] | boolean[] | Option[]

Pair

Pair: [string, string]

ParameterConfig

ParameterConfig: string

RecodeTypes

RecodeTypes: "encodeURIComponent" | "encodeURI" | "decodeURIComponent" | "decodeURI"

Scope

Scope: object

Type declaration

StringExpr

StringExpr: function

Type declaration

    • (scope: object): string
    • Parameters

      • scope: object
        • [index: string]: any

      Returns string

StringFunction

StringFunction: function

Type declaration

    • (...args: string[]): string | boolean | number
    • Parameters

      • Rest ...args: string[]

      Returns string | boolean | number

StringTemplate

StringTemplate: string

Variables

Const compiler

compiler: Compiler = new Compiler()

Const conf

conf: any = yaml.safeLoad(str)
  • lev0
    • startWithout$
      • Object/array -> lev + 递归
    • startWith$
      • first: ob: startWith$ / op + paramer
      • fllowing:
        • each

Const config

config: any = yaml.safeLoad(FS.readFileSync(path.resolve(dir, "configuration.yml"), "utf8"))

Const dir

dir: string = path.resolve(__dirname, "../../../")

Const files

files: string[] = []

Const fs

fs: FileSystemHost = project.getFileSystem()

Const funcs

funcs: any[] = []

Const project

project: Project = new Project({addFilesFromTsConfig: false,manipulationSettings: {// TwoSpaces, FourSpaces, EightSpaces, or TabindentationText: IndentationText.FourSpaces,// LineFeed or CarriageReturnLineFeednewLineKind: NewLineKind.LineFeed,// Single or DoublequoteKind: QuoteKind.Double,},tsConfigFilePath: path.resolve(dir, config.tsconfig),// useVirtualFileSystem: true,})

Const rxjsExt

rxjsExt: any = require("../../../dist/configurable.json")

Const sourceFile

sourceFile: SourceFile = project.createSourceFile("file.ts", "console.log(5);const sss = \"haha\";console.log(sss);")

Const sourceFiles

sourceFiles: SourceFile[] = project.addExistingSourceFiles(files)

Const str

str: "scope:combined: sssssource:- fromHttp: "https://www.baidu.com/"- line:start:- source:" = `scope:combined: sssssource:- fromHttp: "https://www.baidu.com/"- line:start:- source:`

Functions

applyArray

  • applyArray(target: OptionItems[], scope: object): any[]
  • Parameters

    • target: OptionItems[]
    • scope: object
      • [index: string]: any

    Returns any[]

applyItem

  • Parameters

    • item: ConfigItem
    • scope: object
      • [index: string]: any

    Returns any

applyObject

  • applyObject(obj: object, scope: object): Option
  • Parameters

    • obj: object
    • scope: object
      • [index: string]: any

    Returns Option

bufferBreak

  • bufferBreak(breaker?: string): OperatorFunction<Buffer, Buffer>
  • Parameters

    • Default value breaker: string = ""

    Returns OperatorFunction<Buffer, Buffer>

deformat

  • deformat(combined: string): OperatorFunction<string, object>
  • Deformat provides the operator to split value and returns {[index: string]: string}, Usually used when parsing server logs.

    example
    
    const combined = `$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"`;
    const log = `192.168.203.111 - - [03/Dec/2014:22:07:37 -0800] "GET /api/foo/bar?key=value&key=has space&key has \x22&key2=var2 HTTP/1.1" 404 576 "-" "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36"`;
    
    of(log).pipe(deformat(combined))
     .subscribe((v) => console.log);
    // {remote_addr: "192.168.203.111", remote_user: ...}

    Parameters

    • combined: string

    Returns OperatorFunction<string, object>

filterPair

  • filterPair(func: function): OperatorFunction<Pair, Pair>
  • export
    operators

    Parameters

    • func: function
        • (idx: string, val: string): boolean
        • Parameters

          • idx: string
          • val: string

          Returns boolean

    Returns OperatorFunction<Pair, Pair>

fromFile

  • configuration

    Parameters

    Returns Observable<Buffer>

fromFtp

  • configuration

    Parameters

    Returns Observable<Buffer>

fromHttp

  • fromHttp(reqOption: string | ClientRequestArgs, resOption?: ResponseOption): Observable<Buffer>
  • configuration

    Parameters

    • reqOption: string | ClientRequestArgs
    • Default value resOption: ResponseOption = {}

    Returns Observable<Buffer>

isExprLike

  • isExprLike(str: string): false | true | RegExpMatchArray
  • Parameters

    • str: string

    Returns false | true | RegExpMatchArray

json

  • json<T>(lazy?: boolean): OperatorFunction<string, T>
  • Type parameters

    • T

    Parameters

    • Default value lazy: boolean = false

    Returns OperatorFunction<string, T>

jsonLazy

  • jsonLazy<T>(): OperatorFunction<string, object>
  • Type parameters

    • T

    Returns OperatorFunction<string, object>

line

  • line<T>(breaker?: string): OperatorFunction<T, string>
  • configuration

    Type parameters

    • T: Buffer | string

    Parameters

    • Default value breaker: string = ""

    Returns OperatorFunction<T, string>

mapFile

  • mapFile(): OperatorFunction<string | FileReadOption, Observable<Buffer>>
  • Returns OperatorFunction<string | FileReadOption, Observable<Buffer>>

mapFtp

  • Returns OperatorFunction<FtpReadOption, Observable<Buffer>>

mapHttp

  • mapHttp<T>(): OperatorFunction<T, Observable<Buffer>>
  • Type parameters

    • T: string | ClientRequestArgs

    Returns OperatorFunction<T, Observable<Buffer>>

match

  • match(regexp: RegExp): OperatorFunction<string, string[]>
  • Parameters

    • regexp: RegExp

    Returns OperatorFunction<string, string[]>

matchAll

  • matchAll(regexp: RegExp): OperatorFunction<string, string>
  • Parameters

    • regexp: RegExp

    Returns OperatorFunction<string, string>

mergeBuffer

  • mergeBuffer<T>(pulsar: ConnectableObservable<any>): OperatorFunction<Observable<T>, T[]>
  • Type parameters

    • T

    Parameters

    • pulsar: ConnectableObservable<any>

    Returns OperatorFunction<Observable<T>, T[]>

modify

  • configuration

    Parameters

    Returns OperatorFunction<string, string>

  • Parameters

    Returns OperatorFunction<string, string>

modifyPair

modifyPairInPairs

  • modifyPairInPairs(targetIndex: string | RegExp, newValueExpr: string, newIndexExpr?: string, type?: string): OperatorFunction<Pair[], Pair[]>
  • Parameters

    • targetIndex: string | RegExp
    • newValueExpr: string
    • Optional newIndexExpr: string
    • Optional type: string

    Returns OperatorFunction<Pair[], Pair[]>

modifyPairWith

  • modifyPairWith(indexOperation?: OperatorFunction<string, string>, valueOperation?: OperatorFunction<string, string>): OperatorFunction<Pair, Pair>
  • Parameters

    • Optional indexOperation: OperatorFunction<string, string>
    • Optional valueOperation: OperatorFunction<string, string>

    Returns OperatorFunction<Pair, Pair>

observable

  • observable(val: any, scope: Scope): void
  • Parameters

    Returns void

option

  • option<T, R>(config: R): OperatorFunction<T, R>
  • Applies config with enviroment emitted by the source Observable, and emits the resulting option as an Observable.

    example
    
    of({root: __dirname}).pipe(option("`${root}`"));
    of(1, 2, 3).pipe(map(), option("`${root}`"));

    Type parameters

    • T

    • R

    Parameters

    • config: R

    Returns OperatorFunction<T, R>

pair

  • pair(index?: number | string, value?: number | string): OperatorFunction<string | string[], Pair>
  • Parameters

    • Optional index: number | string
    • Optional value: number | string

    Returns OperatorFunction<string | string[], Pair>

pairFromJson

  • pairFromJson(index: string): OperatorFunction<object, Pair>
  • pairFromJson(indexExprLike: string, valueExprLike: string): OperatorFunction<object, Pair>
  • Parameters

    • index: string

    Returns OperatorFunction<object, Pair>

  • Parameters

    • indexExprLike: string
    • valueExprLike: string

    Returns OperatorFunction<object, Pair>

pairIterable

  • pairIterable<T>(): OperatorFunction<Iterable<T> | object, [string, T]>
  • Type parameters

    • T

    Returns OperatorFunction<Iterable<T> | object, [string, T]>

recode

  • recode(type: RecodeTypes): OperatorFunction<string, string>
  • Parameters

    Returns OperatorFunction<string, string>

resolve

  • Parameters

    Returns StringExpr

resolveArray

  • resolveArray(target: any[], parentKey: string): any[]
  • Parameters

    • target: any[]
    • parentKey: string

    Returns any[]

resolveBoolean

resolveItem

  • resolveItem<T>(item: T, key?: string | undefined): any
  • Type parameters

    • T

    Parameters

    • item: T
    • Optional key: string | undefined

    Returns any

resolveNumber

  • Parameters

    Returns NumberExpr

resolveString

  • resolveString(target: string, key?: string): string | expression.BooleanExpr | expression.NumberExpr | expression.StringExpr
  • Parameters

    • target: string
    • Optional key: string

    Returns string | expression.BooleanExpr | expression.NumberExpr | expression.StringExpr

selectPair

  • selectPair(index: string): OperatorFunction<Pair, Pair>
  • selectPair(index: string, ...indexs: string[]): OperatorFunction<Pair, Pair>
  • selectPair(drop: boolean, ...indexs: string[]): OperatorFunction<Pair, Pair>
  • Parameters

    • index: string

    Returns OperatorFunction<Pair, Pair>

  • Parameters

    • index: string
    • Rest ...indexs: string[]

    Returns OperatorFunction<Pair, Pair>

  • Parameters

    • drop: boolean
    • Rest ...indexs: string[]

    Returns OperatorFunction<Pair, Pair>

shuntFile

  • shuntFile provides the operator to write the buffer to the file and returns Observable

    example
    
    of("A", "B", "C").pipe(shuntFile(filePath)).subscribe(
     (v) => console.log(v),  // "A"  "B"  "C"
     undefined,
     () => console.log(fs.readFileSync(filePath).toString());  // "ABC"
    );

    Type parameters

    • T: string | Buffer

    Parameters

    Returns OperatorFunction<T, T>

split

  • split(separater: string, keep?: boolean): OperatorFunction<string, string[]>
  • Parameters

    • separater: string
    • Default value keep: boolean = false

    Returns OperatorFunction<string, string[]>

splitAll

  • splitAll(separater: string, keep?: boolean): OperatorFunction<string, string>
  • Parameters

    • separater: string
    • Default value keep: boolean = false

    Returns OperatorFunction<string, string>

splitKeep

  • splitKeep(separater: string): OperatorFunction<string, string[]>
  • Parameters

    • separater: string

    Returns OperatorFunction<string, string[]>

toString

  • toString(): OperatorFunction<Object, string>
  • Returns OperatorFunction<Object, string>

uniquePairs

  • uniquePairs(type?: "abandon" | "rename" | "cover"): OperatorFunction<Pair[], Pair[]>
  • Parameters

    • Default value type: "abandon" | "rename" | "cover" = "cover"

    Returns OperatorFunction<Pair[], Pair[]>

Object literals

Const func

func: object

decodeURI

decodeURI: decodeURI

decodeURIComponent

decodeURIComponent: decodeURIComponent

encodeURI

encodeURI: encodeURI

encodeURIComponent

encodeURIComponent: encodeURIComponent