@@ -8,7 +8,7 @@ import QueryStream from 'pg-query-stream';
8
8
import Logger from '../services/logger' ;
9
9
import Cache from '../services/cache/cache' ;
10
10
import Config from '../services/config' ;
11
- import { plainToClass , classToPlain } from 'class-transformer' ;
11
+ import { plainToClass , classToPlain , instanceToPlain , plainToInstance } from 'class-transformer' ;
12
12
import { parentPort } from 'worker_threads' ;
13
13
import EdgeQueueItemMapper from '../data_access_layer/mappers/data_warehouse/data/edge_queue_item_mapper' ;
14
14
import { EdgeQueueItem } from '../domain_objects/data_warehouse/data/edge' ;
@@ -26,29 +26,35 @@ void postgresAdapter
26
26
const emitter = ( ) => {
27
27
void postgresAdapter . Pool . connect ( ( err , client , done ) => {
28
28
const stream = client . query ( new QueryStream ( mapper . needRetriedStreamingStatement ( ) ) ) ;
29
+ const promises : Promise < boolean > [ ] = [ ] ;
29
30
const putPromises : Promise < boolean > [ ] = [ ] ;
30
31
31
32
stream . on ( 'data' , ( data ) => {
32
- const item = plainToClass ( EdgeQueueItem , data as object ) ;
33
+ const item = plainToInstance ( EdgeQueueItem , data as object ) ;
33
34
34
35
// check to see if the edge queue item is in the cache, indicating that there is a high probability that
35
36
// this message is already in the queue and either is being processed or waiting to be processed
36
- Cache . get ( `edge_insertion_${ item . id } ` )
37
- . then ( ( set ) => {
38
- if ( ! set ) {
39
- // if the item isn't the cache, we can go ahead and queue data
40
- putPromises . push ( queue . Put ( Config . edge_insertion_queue , classToPlain ( item ) ) ) ;
41
- }
42
- } )
43
- // if we error out we need to go ahead and queue this message anyway, just so we're not dropping
44
- // data
45
- . catch ( ( e ) => {
46
- Logger . error ( `error reading from cache for staging emitter ${ e } ` ) ;
47
- putPromises . push ( queue . Put ( Config . edge_insertion_queue , classToPlain ( item ) ) ) ;
48
- } )
49
- . finally ( ( ) => {
50
- void Cache . set ( `edge_insertion_${ item . id } ` , { } , Config . initial_import_cache_ttl ) ;
51
- } ) ;
37
+ promises . push (
38
+ new Promise ( ( resolve ) => {
39
+ Cache . get ( `edge_insertion_${ item . id } ` )
40
+ . then ( ( set ) => {
41
+ if ( ! set ) {
42
+ // if the item isn't the cache, we can go ahead and queue data
43
+ putPromises . push ( queue . Put ( Config . edge_insertion_queue , instanceToPlain ( item ) ) ) ;
44
+ }
45
+ } )
46
+ // if we error out we need to go ahead and queue this message anyway, just so we're not dropping
47
+ // data
48
+ . catch ( ( e ) => {
49
+ Logger . error ( `error reading from cache for staging emitter ${ e } ` ) ;
50
+ putPromises . push ( queue . Put ( Config . edge_insertion_queue , instanceToPlain ( item ) ) ) ;
51
+ } )
52
+ . finally ( ( ) => {
53
+ void Cache . set ( `edge_insertion_${ item . id } ` , { } , Config . initial_import_cache_ttl ) ;
54
+ resolve ( true ) ;
55
+ } ) ;
56
+ } ) ,
57
+ ) ;
52
58
} ) ;
53
59
54
60
stream . on ( 'error' , ( e : Error ) => {
@@ -62,11 +68,13 @@ void postgresAdapter
62
68
stream . on ( 'end' , ( ) => {
63
69
done ( ) ;
64
70
65
- Promise . all ( putPromises ) . finally ( ( ) => {
66
- if ( parentPort ) parentPort . postMessage ( 'done' ) ;
67
- else {
68
- process . exit ( 0 ) ;
69
- }
71
+ Promise . all ( promises ) . finally ( ( ) => {
72
+ Promise . all ( putPromises ) . finally ( ( ) => {
73
+ if ( parentPort ) parentPort . postMessage ( 'done' ) ;
74
+ else {
75
+ process . exit ( 0 ) ;
76
+ }
77
+ } ) ;
70
78
} ) ;
71
79
} ) ;
72
80
0 commit comments