The first plugin/module manager for stream-oriented web applications
If you have a growing webapp based on RxJS and/or Observables and you want to extend it with plugins, you may have noticed the Observable pattern is not directly compatible with the Request/Response nature of plugin systems.
We are bringing the best of both here, by creating the Duplex
Observable: a new kind of Observable Subject that supports two-way communication, new methods such as .invoke()
and .reply()
, in addition to new operators such as switchInvoke()
and emit()
.
Using the above you can define your plugins as pure observable streams.
npm install -S 'the-observable-plugin-system';
You can create a messabus and initialise your plugins in your main entry point:
main.js
import TOPS from 'the-observable-plugin-system';
import module1 from './plugins/module1.js';
import module2 from './plugins/module2.js';
function main() {
TOPS({
module1,
module2,
});
};
main()
Each plugin typically sits in its own file and exports a default function with this signature:
export default async ({ TOPIC1, TOPIC2, TOPIC3 }, config) => {
TOPIC1.pipe(
delay(1000),
map(x => x+1),
switchInvoke(TOPIC2),
emit(TOPIC3),
).reply()
};
Plugins can communicate with each-other via a common message bus through a number of topics. Each topic takes the form of an Observable Subject that's available to plugins via their default-exported function.
The example above listens to messages coming from TOPIC1, delays processing by 1s, adds one, emits the result to TOPIC2, all replies to which will be emitted to TOPIC3 and then returned to the original caller who emitted to TOPIC1.
The Duplex Stream is a particular type of RxJS Subject that enables two-way communication, essential for the request-response communication pattern used by a message bus and that's not available in regular Observables.
Duplex streams bring two new instance methods: invoke
and reply
:
This emits data into a duplex stream and returns an observable of the responses.
This is a lower-level functionality, mostly used internally by the plugin system. You would most likely use the invoke
operator instead.
const stream = new Duplex();
stream.invoke(value);
This makes it possible for a Duplex stream to return a value to the original emitter by simply taking values coming from the pipeline and re-emitting them into the caller stream.
const stream = new Duplex().pipe(
...operators,
map(data => asReturnValue(data)),
).reply();
These are special operators for Duplex streams to help implement various calling patterns
Similar to RxJS switchMap
. It emits the current value from the source stream and returns a new stream of the responses until a new value is received from the source
Similar to RxJS mergeMap
. It emits the current value from the source stream into the specified topic and re-emits all returned values as they arrive
A fire-and-forget operator that simply emits the current value into the specified stream and moves on
WIP
WIP
WIP
A Stream-Oriented Webapp where everything is a module, everything is a stream