kahuna/public/js/util/rx.js (42 lines of code) (raw):
import angular from 'angular';
import 'rx-angular';
/** Extend rx-angular for easier integration.
*/
export var rxUtil = angular.module('util.rx', [
'rx'
]);
// TODO: contribute these back to rx-angular?
rxUtil.factory('observe$', ['observeOnScope', function(observeOnScope) {
return function observe$(scope, watchExpression, objectEquality) {
return observeOnScope(scope, watchExpression, objectEquality).
map(({newValue}) => newValue);
};
}]);
rxUtil.factory('observeCollection$',
['observeCollectionOnScope',
function(observeCollectionOnScope) {
return function observeCollection$(scope, watchExpression) {
return observeCollectionOnScope(scope, watchExpression).
map(({newValue}) => newValue);
};
}]);
rxUtil.factory('subscribe$', [function() {
return function subscribe$(scope, observable$, ...observer) {
const subscription = observable$.subscribe(...observer);
scope.$on('$destroy', () => subscription.dispose());
};
}]);
rxUtil.factory('observeCollectionOnScope', ['rx', function(rx) {
return function(scope, watchExpression) {
return rx.Observable.create(function (observer) {
// Create function to handle old and new Value
function listener (newValue, oldValue) {
observer.onNext({ oldValue: oldValue, newValue: newValue });
}
// Returns function which disconnects the $watchCollection expression
return scope.$watchCollection(watchExpression, listener);
});
};
}]);
rxUtil.factory('inject$', ['subscribe$', function(subscribe$) {
return function inject$(scope, observable$, context, name) {
subscribe$(scope, observable$, (value) => {
context[name] = value;
});
};
}]);