Skip to content

Commit aefaf95

Browse files
committed
feat(nestjs-json-rpc-sdk): add takeUntil for disconnect socket
1 parent 31450b3 commit aefaf95

File tree

14 files changed

+389
-110
lines changed

14 files changed

+389
-110
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
- *[json-api-nestjs](https://github.com/klerick/nestjs-json-api/tree/master/libs/json-api/json-api-nestjs)* - plugin for create CRUD overs JSON API
1111
- *[json-api-nestjs-sdk](https://github.com/klerick/nestjs-json-api/tree/master/libs/json-api/json-api-nestjs-sdk)* - tool for client, call api over *json-api-nestjs*
12+
- *[nestjs-json-rpc](https://github.com/klerick/nestjs-json-api/tree/master/libs/json-rpc/nestjs-json-rpc)* - plugin for create RPC server using [JSON-RPC](https://www.jsonrpc.org/)
13+
- *[nestjs-json-rpc-sdk](https://github.com/klerick/nestjs-json-api/tree/master/libs/json-rpc/nestjs-json-rpc-sdk)* - tool for client, call RPC server *nestjs-json-rpc*
1214
- *json-api-nestjs-acl* - tool for acl over *json-api-nestjs*(coming soon...)
1315
## Installation
1416

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,60 @@
1-
import { ApplicationConfig, importProvidersFrom } from '@angular/core';
1+
import {
2+
ApplicationConfig,
3+
importProvidersFrom,
4+
InjectionToken,
5+
} from '@angular/core';
26
import { JsonApiAngular } from 'json-api-nestjs-sdk/json-api-nestjs-sdk.module';
37
import {
48
JsonRpcAngular,
9+
JsonRpcAngularConfig,
510
TransportType,
611
} from '@klerick/nestjs-json-rpc-sdk/json-rpc-sdk.module';
7-
import io from 'socket.io-client';
12+
import { Subject } from 'rxjs';
13+
import { webSocket } from 'rxjs/webSocket';
14+
import { io } from 'socket.io-client';
15+
16+
const destroySubject = new Subject<boolean>();
17+
setTimeout(() => {
18+
console.log('Disconnect');
19+
destroySubject.next(true);
20+
destroySubject.complete();
21+
}, 5000);
22+
const destroySubjectToken = new InjectionToken('destroySubjectToken', {
23+
factory: () => destroySubject,
24+
});
25+
destroySubject.subscribe((r) => console.log(r));
26+
const tokenSocketInst = new InjectionToken('tokenSocketInst', {
27+
factory: () => webSocket('ws://localhost:4200/rpc'),
28+
});
29+
30+
const tokenIoSocketInst = new InjectionToken('tokenIoSocketInst', {
31+
factory: () => io('http://localhost:3000', { path: '/rpc' }),
32+
});
33+
34+
const httpConfig: JsonRpcAngularConfig = {
35+
transport: TransportType.HTTP,
36+
rpcPath: '/api/rpc',
37+
rpcHost: 'http://localhost:4200',
38+
};
39+
const wsConfig: JsonRpcAngularConfig = {
40+
transport: TransportType.WS,
41+
useWsNativeSocket: true,
42+
rpcPath: 'rpc',
43+
rpcHost: 'ws://localhost:4200',
44+
destroySubjectToken,
45+
};
46+
const wsConfigWithToken: JsonRpcAngularConfig = {
47+
transport: TransportType.WS,
48+
useWsNativeSocket: true,
49+
tokenSocketInst,
50+
destroySubjectToken,
51+
};
52+
const ioConfig: JsonRpcAngularConfig = {
53+
transport: TransportType.WS,
54+
useWsNativeSocket: false,
55+
destroySubjectToken,
56+
tokenSocketInst: tokenIoSocketInst,
57+
};
858

959
export const appConfig: ApplicationConfig = {
1060
providers: [
@@ -17,14 +67,12 @@ export const appConfig: ApplicationConfig = {
1767
})
1868
),
1969
importProvidersFrom(
20-
JsonRpcAngular.forRoot({
21-
transport: TransportType.WS,
22-
rpcPath: 'rpc',
23-
rpcHost: 'ws://localhost:4200',
24-
useWsNativeSocket: true,
25-
// useWsNativeSocket: false,
26-
// webSocketCtor: io('http://localhost:3000', { path: '/rpc' }),
27-
})
70+
JsonRpcAngular.forRoot(
71+
// httpConfig
72+
// wsConfig
73+
// wsConfigWithToken,
74+
ioConfig
75+
)
2876
),
2977
],
3078
};

apps/json-api-server-e2e/src/json-api/json-rpc/run-ws-json-rpc.spec.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ import {
55
RpcError,
66
} from '@klerick/nestjs-json-rpc-sdk';
77

8-
import { creatWsRpcSdk, MapperRpc, run } from '../utils/run-application';
8+
import {
9+
creatWsRpcSdk,
10+
MapperRpc,
11+
run,
12+
destroySubject,
13+
} from '../utils/run-application';
914

1015
let app: INestApplication;
1116

@@ -14,6 +19,8 @@ beforeAll(async () => {
1419
});
1520

1621
afterAll(async () => {
22+
destroySubject.next(true);
23+
destroySubject.complete();
1724
await app.close();
1825
});
1926

apps/json-api-server-e2e/src/json-api/utils/run-application.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { AppModule } from '../../../../json-api-server/src/app/app.module';
1616

1717
import { JsonConfig } from '../../../../../libs/json-api/json-api-nestjs-sdk/src/lib/types';
1818
import { WsAdapter } from '@nestjs/platform-ws';
19+
import { Subject } from 'rxjs';
1920

2021
export const axiosAdapter = adapterForAxios(axios);
2122
let saveApp: INestApplication;
@@ -70,15 +71,16 @@ export const creatRpcSdk = (config: Partial<RpcConfig> = {}) =>
7071
},
7172
true
7273
);
73-
74+
export const destroySubject = new Subject<boolean>();
7475
export const creatWsRpcSdk = (config: Partial<RpcConfig> = {}) =>
7576
RpcFactory<MapperRpc>(
7677
{
7778
transport: TransportType.WS,
7879
useWsNativeSocket: true,
79-
webSocketCtor: WebSocket,
80+
nativeSocketImplementation: WebSocket,
8081
rpcHost: `http://localhost:${port}`,
8182
rpcPath: `/rpc`,
83+
destroySubject,
8284
},
8385
true
8486
);
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import { inject, InjectionToken } from '@angular/core';
2+
import { HttpClient } from '@angular/common/http';
3+
import { Subject } from 'rxjs';
4+
import { WebSocketSubject } from 'rxjs/internal/observable/dom/WebSocketSubject';
5+
import { Socket } from 'socket.io-client';
6+
7+
import {
8+
LoopFunc,
9+
PayloadRpc,
10+
RpcResult,
11+
RpcReturnList,
12+
RpcConfig,
13+
TransportType,
14+
} from '../types';
15+
import { transportFactory } from '../factory';
16+
import { webSocketFactory, WsResponse } from '../factory/ws-transport.factory';
17+
18+
import { JSON_RPC_SDK_CONFIG, JSON_RPC_SDK_TRANSPORT } from './tokens';
19+
import { RpcBatchFactory, rpcProxy } from '../utils';
20+
21+
export function rpcBatchFactory() {
22+
return RpcBatchFactory(inject(JSON_RPC_SDK_TRANSPORT));
23+
}
24+
25+
export function rpcFactory() {
26+
return rpcProxy<RpcReturnList<any, true>>(
27+
inject(JSON_RPC_SDK_TRANSPORT),
28+
false
29+
);
30+
}
31+
32+
export function angularTransportFactory() {
33+
const angularConfig = inject(JSON_RPC_SDK_CONFIG);
34+
const httpClient = inject(HttpClient);
35+
36+
if (angularConfig.transport === TransportType.HTTP) {
37+
const rpcConfig: RpcConfig = {
38+
transport: angularConfig.transport,
39+
httpAgentFactory: (url: string) => (body: PayloadRpc<LoopFunc>) =>
40+
httpClient.post<RpcResult<LoopFunc>>(url, body),
41+
rpcPath: angularConfig.rpcPath,
42+
rpcHost: angularConfig.rpcHost,
43+
};
44+
return transportFactory(rpcConfig);
45+
}
46+
47+
const destroySubject =
48+
(angularConfig.destroySubjectToken &&
49+
inject<Subject<boolean>>(angularConfig.destroySubjectToken, {
50+
optional: true,
51+
})) ||
52+
new Subject<boolean>();
53+
54+
if (angularConfig.useWsNativeSocket) {
55+
let socketInst:
56+
| WebSocketSubject<WsResponse<PayloadRpc<LoopFunc> | RpcResult<LoopFunc>>>
57+
| undefined = undefined;
58+
if ('tokenSocketInst' in angularConfig) {
59+
socketInst =
60+
inject<
61+
WebSocketSubject<
62+
WsResponse<PayloadRpc<LoopFunc> | RpcResult<LoopFunc>>
63+
>
64+
>(angularConfig['tokenSocketInst'], { optional: true }) || undefined;
65+
} else {
66+
const url = new URL(
67+
angularConfig.rpcPath,
68+
angularConfig.rpcHost
69+
).toString();
70+
socketInst = webSocketFactory(
71+
url,
72+
angularConfig.nativeSocketImplementation
73+
);
74+
}
75+
76+
if (socketInst === undefined) throw new Error('Cant create socket inst');
77+
const rpcConfig: RpcConfig = {
78+
transport: angularConfig.transport,
79+
useWsNativeSocket: angularConfig.useWsNativeSocket,
80+
nativeSocketInstance: socketInst,
81+
destroySubject,
82+
};
83+
84+
return transportFactory(rpcConfig);
85+
}
86+
const ioSocketInstance = inject<Socket>(angularConfig['tokenSocketInst']);
87+
const rpcConfig: RpcConfig = {
88+
transport: angularConfig.transport,
89+
useWsNativeSocket: angularConfig.useWsNativeSocket,
90+
ioSocketInstance: ioSocketInstance,
91+
destroySubject,
92+
};
93+
return transportFactory(rpcConfig);
94+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { ModuleWithProviders, NgModule } from '@angular/core';
2+
import { HttpClientModule } from '@angular/common/http';
3+
4+
import { JSON_RPC_SDK_CONFIG } from './tokens';
5+
import { JsonRpcAngularConfig } from '../types';
6+
7+
@NgModule({
8+
imports: [HttpClientModule],
9+
})
10+
export class JsonRpcAngular {
11+
static forRoot(
12+
config: JsonRpcAngularConfig
13+
): ModuleWithProviders<JsonRpcAngular> {
14+
return {
15+
ngModule: JsonRpcAngular,
16+
providers: [
17+
{
18+
useValue: config,
19+
provide: JSON_RPC_SDK_CONFIG,
20+
},
21+
],
22+
};
23+
}
24+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { InjectionToken } from '@angular/core';
2+
import { LoopFunc, RpcBatch, RpcReturnList, Transport } from '../types';
3+
4+
import { JsonRpcAngularConfig } from '@klerick/nestjs-json-rpc-sdk/json-rpc-sdk.module';
5+
import {
6+
angularTransportFactory,
7+
rpcBatchFactory,
8+
rpcFactory,
9+
} from './factory';
10+
11+
export const JSON_RPC_SDK_CONFIG = new InjectionToken<JsonRpcAngularConfig>(
12+
'Main config object for sdk'
13+
);
14+
15+
export const JSON_RPC_SDK_TRANSPORT = new InjectionToken<Transport<LoopFunc>>(
16+
'Transport for RPC',
17+
{
18+
factory: angularTransportFactory,
19+
}
20+
);
21+
22+
export const JSON_RPC = new InjectionToken<RpcReturnList<object, false>>(
23+
'Rpc client',
24+
{
25+
factory: rpcFactory,
26+
}
27+
);
28+
29+
export const RPC_BATCH = new InjectionToken<RpcBatch>('Rpc client for batch', {
30+
factory: rpcBatchFactory,
31+
});

libs/json-rpc/nestjs-json-rpc-sdk/src/lib/factory/io-transport.factory.ts

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,19 @@
1-
import { filter, of, Subject, switchMap, take, tap } from 'rxjs';
21
import type { Socket } from 'socket.io-client';
2+
import {
3+
filter,
4+
Observable,
5+
Observer,
6+
of,
7+
Subject,
8+
Subscription,
9+
switchMap,
10+
take,
11+
takeUntil,
12+
tap,
13+
} from 'rxjs';
14+
import { Subscriber } from 'rxjs/internal/Subscriber';
15+
import { TeardownLogic } from 'rxjs/internal/types';
16+
317
import { LoopFunc, PayloadRpc, RpcResult, Transport } from '../types';
418
import { WS_EVENT_NAME } from '../constans';
519

@@ -11,18 +25,51 @@ interface ClientToServerEvents<T extends LoopFunc> {
1125
rpc: (payload: PayloadRpc<T>) => void;
1226
}
1327

28+
class SocketIo<T extends LoopFunc> extends Observable<RpcResult<T>> {
29+
private messageQueue: PayloadRpc<T>[] = [];
30+
constructor(
31+
private io: Socket<ServerToClientEvents<T>, ClientToServerEvents<T>>
32+
) {
33+
super((subscriber) => this.subscribeForObservable(subscriber));
34+
this.io.on('connect', () => {
35+
while (this.messageQueue.length > 0) {
36+
const msg = this.messageQueue.shift();
37+
if (!msg) break;
38+
this.io.emit(WS_EVENT_NAME, msg);
39+
}
40+
});
41+
}
42+
43+
private subscribeForObservable(
44+
subscriber: Subscriber<RpcResult<T>>
45+
): TeardownLogic {
46+
this.io.on(WS_EVENT_NAME, (value) => subscriber.next(value));
47+
this.io.on('connect_error', (error: Error) => subscriber.error(error));
48+
this.io.on('disconnect', () => subscriber.complete());
49+
return { unsubscribe: () => this.io.close() };
50+
}
51+
52+
public next(message: PayloadRpc<T>): void {
53+
if (!this.io.connected) {
54+
this.messageQueue.push(message);
55+
return;
56+
}
57+
58+
this.io.emit(WS_EVENT_NAME, message);
59+
}
60+
}
61+
1462
export function ioTransportFactory<T extends LoopFunc>(
15-
io: Socket<ServerToClientEvents<T>, ClientToServerEvents<T>>
63+
io: Socket<ServerToClientEvents<T>, ClientToServerEvents<T>>,
64+
destroyFactory: Subject<boolean>
1665
): Transport<T> {
17-
const subjectData = new Subject<RpcResult<T>>();
18-
io.on(WS_EVENT_NAME, (event) => subjectData.next(event));
19-
66+
const socketSubject = new SocketIo(io).pipe(takeUntil(destroyFactory));
2067
return (body: PayloadRpc<T>) => {
2168
const { id } = body;
2269
return of(true).pipe(
2370
tap(() => io.emit(WS_EVENT_NAME, body)),
2471
switchMap(() =>
25-
subjectData.pipe(filter((response) => response.id === id))
72+
socketSubject.pipe(filter((response) => response.id === id))
2673
),
2774
take(1)
2875
);

0 commit comments

Comments
 (0)