- Observable is an asynchronous pattern. In the Observable pattern we have an Observable and an Observer. Observer observes the Observable. In many implementations an Observer is also called as a Subscriber.
- An Observable can have many Observers (also called Subscribers).
- Observable emits items or notifications over time to which an Observer (also called Subscriber) can subscribe.
- When a subscriber subscribes to an Observable, the subscriber also specifies a callback function.
- This subscriber callback function is notified as and when the Observable emits items or notifications.
- Within this callback function we write code to handle data itmes or notifications received from the Observable
- Notice in the getEmployees() method, we are using the get() method of the angular http service to issue a get request over http. If you right click on get() method and go to it’s definition you will notice that this method return Observable
. - Observable
is not that useful to us, so we have set the return type of getEmployees() method to Observable - To convert Observable
to Observable we are using the map operator provided by rxjs.
import { Injectable } from '@angular/core';
import { IEmployee } from './employee';
// Import Http & Response from angular HTTP module
import { Http, Response } from '@angular/http';
// Import Observable from rxjs/Observable
import { Observable } from 'rxjs/Observable';
// Import the map operator
import 'rxjs/add/operator/map';
@Injectable()
export class EmployeeService {
// Inject Angular http service
constructor(private _http: Http) { }
// Notice the method return type is Observable<IEmployee[]>
getEmployees(): Observable<IEmployee[]> {
// To convert Observable<Response> to Observable<IEmployee[]>
// we are using the map operator
return this._http.get('http://localhost:24535/api/employees')
.map((response: Response) => <IEmployee[]>response.json());
}
}
Subscribe to the Observable returned by angular EmployeeService : EmployeeListComponent needs the employees data returned by the service. So in the ngOnInit() method of “employeeList.component.ts” use the subscribe method as shown below.
ngOnInit() {
this._employeeService.getEmployees()
.subscribe(employeesData => this.employees = employeesData);
}
Notice to the subscribe() function we are passing an other arrow function as a parameter. This arrow function is called when the Observable emits an item. In our case the Observable emits an array of IEmployee objects. employeesData parameter receives the array of IEmployee objects, which we are then using to initialise employees property of the EmployeeListComponent class.
We can specify upto 3 callback functions as parameters to the subscribe() method as shown below.
Callback Method | Purpose |
---|---|
onNext | The Observable calls this method whenever the Observable emits an item. The emitted item is passed as a parameter to this method |
onError | The Observable calls this method if there is an error |
onCompleted | The Observable calls this method after it has emitted all items, i.e after it has called onNext for the final time |
1. Observable comes from RxJs library
Example :
var obs$ : Observable = of(1,2,3);
obs$.subscribe(x => console.log())
Example :
import {Observable} from 'rxjs'
const test$ = new Observable(subscriber => {
//Producer - Push strategy
subscriber.next('1');
});
test$.subscriber(x => {
//consumer
console.log(x)
})
which something similar to the javascript function :
const test =() => {
console.log('normal test');
return 1;
}
var y = test.call();
console.log(y)
1. we can emit multiple values :
subscriber.next('1');
subscriber.next('2');
subscriber.next('3');
2. we can subscribe multiple places/times :
test$.subscribe(x => console.log('1st : ', x))
test$.subscribe(x => console.log('2st : ', x))
// 1st 1, 1st 2, 1st 3, 2st 1, 2st 2, 2st 3
3. Observable can be used in both synchronous(above example) and asynchronous
async Ex : setTimeout (()=> subscriber.next('4'), 1000)
4. if completed called remaining will not executed further :
subscribe.next('1');
subscribe.next('2');
subscribe.error('Error Occured');
subscribe.complete();
subscribe.next('3')
test$.subscribe((val, error, complete) => {
console.log('1st ', x )
})
// 1st 1, 1st 2
5. Subscribe has three parameters
test$.subscribe(
x => {console.log("1st ", x)},
error => {console.log("error", error)},
complete => {console.log('complete')},
);
//1st 1, 1st 2, complete
6. We can receive error in subscriber
subscribe.next('1');
subscribe.next('2');
subscribe.error('Error Occured');
test$.subscribe(
x => {console.log("1st ", x)},
error => {console.log("error", error)},
complete => {console.log('complete')},
);
//1st 1, 1st 2, error error occured
Create an observable that publishes events
app.component.html
<p (click)="remove()">
start editing to get result
</p>
export class AppComponent {
test$ = null;
constructor() {
this.test$ = this.fromEvent(document.body, 'click') //target, eventName
.subscribe(x=> {
console.log('body clicked');
})
}
remove(){
this.test$.unsubscribe();
}
function fromEvent(target, eventName) {
return new Observable((observer) => {
const handler = (e) => observer.next(e);
// Add the event handler to the target
target.addEventListener(eventName, handler);
return () => {
// Detach the event handler from the target
console.log('unsubscribe clicked')
target.removeEventListener(eventName, handler);
};
});
}
}
6. How create an observable
In actual use observable can be in different place and subscribe can be in different place but here we used with in a same component
app.component.ts
constructor(){
const test$ = new Observable(sub => {sub.next('Rahul');
sub.next('Ram')
});
test$.subscribe(x => {console.log('1 ',x)})
}
test$.subscribe(y => {console.log('2 ',y)})
}
Observables
- They are cold: Code gets executed when they have at least a single observer.
- Creates copy of data: Observable creates copy of data for each observer.
- Uni-directional: Observer can not assign value to observable(origin/master).
- The code will run for each observer . If its a HTTP call, it gets called for each observer.
- if its a service we want to share among all the components, it wont have latest result all new subscribers will still subscribe to same observable and get value from scratch
- Unicast means can emit values from the observable not from any other component.
Subject
- They are hot: code gets executed and value gets broadcast even if there is no observer.
- Shares data: Same data get shared between all observers.
- bi-directional: Observer can assign value to observable(origin/master).
- If are using using subject then you miss all the values that are broadcast before creation of observer. So here comes Replay Subject
- multicast, can cast values to multiple subscribers and can act as both subscribers and emmitter