В этой статье я постараюсь детально охватить узкую сферу применения технологии в рамках фреймворка Angular и его уже неотъемлемого помощника — RxJs, при этом мы намеренно не будем касаться серверных реализаций, т.к. это полноценная тема для отдельной статьи.
Данный текст будет полезен тем, кто уже знаком с Angular, но хочет углубить свои знания непосредственно по теме.
Для начала немного базовой информации.
Согласно Википедии, WebSocket это «протокол дуплексной связи (может передавать и принимать одновременно) поверх TCP-соединения, предназначенный для обмена сообщениями между браузером и веб-сервером в режиме реального времени.
WebSocket разработан для воплощения в web-браузерах и web-серверах, но он может быть использован для любого клиентского или серверного приложения. Протокол WebSocket?-?это независимый протокол, основанный на протоколе TCP. Он делает возможным более тесное взаимодействие между браузером и web-сайтом, способствуя распространению интерактивного содержимого и созданию приложений реального времени.»
const ws = new WebSocket("ws://www.example.com/socketserver", "protocolOne");
ws.onopen = () => {
ws.onmessage = (event) => {
console.log(event);
}
ws.send("Here's some text that the server is urgently awaiting!");
};
const wsUser = new WebSocket("ws://www.example.com/user");
wsUser.onmessage = (event) => { // ... };
const wsNews = new WebSocket("ws://www.example.com/news");
wsNews.onmessage = (event) => { // ... };
const wsTime = new WebSocket("ws://www.example.com/time");
wsTime.onmessage = (event) => { // ... };
const wsDinner = new WebSocket("ws://www.example.com/dinner");
wsDinner.onmessage = (event) => { // ... };
const wsCurrency = new WebSocket("ws://www.example.com/currency");
wsCurrency.onmessage = (event) => { // ... };
const wsOnline = new WebSocket("ws://www.example.com/online");
wsOnline.onmessage = (event) => { // ... };
const wsLogin = new WebSocket("ws://www.example.com/login");
wsLogin.onmessage = (event) => { // ... };
const wsLogout = new WebSocket("ws://www.example.com/logout");
wsLogout.onmessage = (event) => { // ... };
ws.on("user", (userData) => { / .. })
{
"event": "user",
"data": {
"name": "John Doe",
...
}
}
const ws = new WebSocket("ws://www.example.com");
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.event === 'user') {
// ...
}
if (data.event === 'news') {
// ...
}
};
export interface WebSocketConfig {
url: string;
reconnectInterval?: number;
reconnectAttempts?: number;
}
export class WebsocketModule {
public static config(wsConfig: WebSocketConfig): ModuleWithProviders {
return {
ngModule: WebsocketModule,
providers: [{ provide: config, useValue: wsConfig }]
};
}
}
export interface IWsMessage<T> {
event: string;
data: T;
}
export interface IWebsocketService {
on<T>(event: string): Observable<T>;
send(event: string, data: any): void;
status: Observable<boolean>;
}
// объект конфигурации WebSocketSubject
private config: WebSocketSubjectConfig<IWsMessage<any>>;
private websocketSub: SubscriptionLike;
private statusSub: SubscriptionLike;
// Observable для реконнекта по interval
private reconnection$: Observable<number>;
private websocket$: WebSocketSubject<IWsMessage<any>>;
// сообщает, когда происходит коннект и реконнект
private connection$: Observer<boolean>;
// вспомогательный Observable для работы с подписками на сообщения
private wsMessages$: Subject<IWsMessage<any>>;
// пауза между попытками реконнекта в милисекундах
private reconnectInterval: number;
// количество попыток реконнекта
private reconnectAttempts: number;
// синхронный вспомогатель для статуса соединения
private isConnected: boolean;
// статус соединения
public status: Observable<boolean>;
constructor(@Inject(config) private wsConfig: WebSocketConfig) {
this.wsMessages$ = new Subject<IWsMessage<any>>();
// смотрим конфиг, если пусто, задаем умолчания для реконнекта
this.reconnectInterval = wsConfig.reconnectInterval || 5000;
this.reconnectAttempts = wsConfig.reconnectAttempts || 10;
// при сворачивании коннекта меняем статус connection$ и глушим websocket$
this.config = {
url: wsConfig.url,
closeObserver: {
next: (event: CloseEvent) => {
this.websocket$ = null;
this.connection$.next(false);
}
},
// при коннекте меняем статус connection$
openObserver: {
next: (event: Event) => {
console.log('WebSocket connected!');
this.connection$.next(true);
}
}
};
// connection status
this.status = new Observable<boolean>((observer) => {
this.connection$ = observer;
}).pipe(share(), distinctUntilChanged());
// запускаем реконнект при отсутствии соединения
this.statusSub = this.status
.subscribe((isConnected) => {
this.isConnected = isConnected;
if (!this.reconnection$ && typeof(isConnected) === 'boolean' && !isConnected) {
this.reconnect();
}
});
// говорим, что что-то пошло не так
this.websocketSub = this.wsMessages$.subscribe(
null, (error: ErrorEvent) => console.error('WebSocket error!', error)
);
// коннектимся
this.connect();
}
private connect(): void {
this.websocket$ = new WebSocketSubject(this.config); // создаем
// если есть сообщения, шлем их в дальше,
// если нет, ожидаем
// реконнектимся, если получили ошибку
this.websocket$.subscribe(
(message) => this.wsMessages$.next(message),
(error: Event) => {
if (!this.websocket$) {
// run reconnect if errors
this.reconnect();
}
});
}
private reconnect(): void {
// Создаем interval со значением из reconnectInterval
this.reconnection$ = interval(this.reconnectInterval)
.pipe(takeWhile((v, index) => index < this.reconnectAttempts && !this.websocket$));
// Пытаемся подключиться пока не подключимся, либо не упремся в ограничение попыток подключения
this.reconnection$.subscribe(
() => this.connect(),
null,
() => {
// Subject complete if reconnect attemts ending
this.reconnection$ = null;
if (!this.websocket$) {
this.wsMessages$.complete();
this.connection$.complete();
}
});
}
public on<T>(event: string): Observable<T> {
if (event) {
return this.wsMessages$.pipe(
filter((message: IWsMessage<T>) => message.event === event),
map((message: IWsMessage<T>) => message.data)
);
}
}
public send(event: string, data: any = {}): void {
if (event && this.isConnected) {
// костыль с any потому, что на "том" конце ожидается string
// более изящный костыль не придумался :)
this.websocket$.next(<any>JSON.stringify({ event, data }));
} else {
console.error('Send error!');
}
}
imports: [
WebsocketModule.config({
url: environment.ws // или просто ссылка в виде 'ws://www.example.com'
})
]
constructor(private wsService: WebsocketService) {
this.wsService.on<IMessage[]>('messages')
.subscribe((messages: IMessage[]) => {
console.log(messages);
this.wsService.send('text', 'Test Text!');
});
}
export const WS = {
ON: {
MESSAGES: 'messages'
},
SEND: {
TEXT: 'text'
}
};
this.wsService.on<IMessage[]>(WS.ON.MESSAGES)
.subscribe((messages: IMessage[]) => {
console.log(messages);
this.wsService.send(WS.SEND.TEXT, 'Test Text!');
});
К сожалению, не доступен сервер mySQL