Skip to content

Commit 5df2025

Browse files
authored
Merge pull request #39 from hapinessjs/next
Next into master
2 parents 26a3b10 + 66f7db8 commit 5df2025

40 files changed

+732
-578
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,8 @@ To set up your development environment:
453453
[Back to top](#table-of-contents)
454454

455455
## Change History
456+
* v1.5.0 (2018-08-24)
457+
* Add possibility to provide a custom MessageRouter
456458
* v1.4.3 (2018-08-20)
457459
* Emit RETRY_LIMIT_EXCEEDED error on ConnectionManager
458460
* v1.4.2 (2018-06-11)

package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@hapiness/rabbitmq",
3-
"version": "1.4.3",
3+
"version": "1.5.0",
44
"description": "Hapiness module for rabbitmq",
55
"main": "commonjs/index.js",
66
"types": "index.d.ts",

src/module/decorators.ts

Lines changed: 0 additions & 60 deletions
This file was deleted.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { createDecorator, CoreDecorator, Type } from '@hapiness/core';
2+
import { Options } from 'amqplib';
3+
import { ExchangeType, ChannelOptions } from '../interfaces';
4+
5+
export interface ExchangeDecoratorInterface {
6+
name: string;
7+
type: ExchangeType;
8+
options?: Options.AssertExchange;
9+
channel?: ChannelOptions;
10+
providers?: Array<Type<any> | any>;
11+
}
12+
export const Exchange: CoreDecorator<ExchangeDecoratorInterface> = createDecorator<ExchangeDecoratorInterface>('Exchange', {
13+
name: undefined,
14+
type: undefined,
15+
options: undefined,
16+
channel: undefined,
17+
providers: [],
18+
});

src/module/decorators/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export * from './exchange.decorator';
2+
export * from './message.decorator';
3+
export * from './queue.decorator';
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { Type } from '@hapiness/core';
2+
import { createDecorator, CoreDecorator } from '@hapiness/core';
3+
4+
export interface MessageDecoratorInterface {
5+
queue: Type<any>;
6+
exchange?: Type<any>;
7+
routingKey?: string | RegExp;
8+
filter?: {
9+
[key: string]: string | RegExp;
10+
};
11+
is_fallback?: boolean;
12+
providers?: Array<Type<any> | any>;
13+
}
14+
export const Message: CoreDecorator<MessageDecoratorInterface> = createDecorator<MessageDecoratorInterface>('Message', {
15+
queue: undefined,
16+
exchange: undefined,
17+
routingKey: undefined,
18+
filter: undefined,
19+
is_fallback: false,
20+
providers: [],
21+
});
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { Type } from '@hapiness/core';
2+
import { createDecorator, CoreDecorator } from '@hapiness/core';
3+
import { Options } from 'amqplib';
4+
import { Bind, ChannelOptions } from '../interfaces';
5+
6+
export interface QueueDecoratorInterface {
7+
name: string;
8+
binds?: Array<Bind>;
9+
options?: Options.AssertQueue;
10+
channel?: ChannelOptions;
11+
force_json_decode?: boolean;
12+
providers?: Array<Type<any> | any>;
13+
}
14+
export const Queue: CoreDecorator<QueueDecoratorInterface> = createDecorator<QueueDecoratorInterface>('Queue', {
15+
name: undefined,
16+
binds: undefined,
17+
options: undefined,
18+
channel: undefined,
19+
force_json_decode: false,
20+
providers: [],
21+
});
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import { Observable } from 'rxjs/Observable';
2+
import { CoreModule, DependencyInjection } from '@hapiness/core';
3+
import { ConnectionManager } from '../managers/connection-manager';
4+
import { metadataFromDeclarations } from '../utils';
5+
import { ExchangeDecoratorInterface } from '../decorators';
6+
import { ExchangeManager } from '../managers/exchange-manager';
7+
import { ExchangeWrapper } from '../managers/exchange-wrapper';
8+
9+
export default function buildExchanges(modules: CoreModule[], connection: ConnectionManager): Observable<any> {
10+
return Observable.from(modules)
11+
.filter(_module => !!_module)
12+
.flatMap(_module =>
13+
metadataFromDeclarations<ExchangeDecoratorInterface>(_module.declarations, 'Exchange')
14+
.map(metadata => ({ metadata, _module }))
15+
)
16+
.flatMap(({ metadata, _module }) => DependencyInjection.instantiateComponent(metadata.token, _module.di)
17+
.map(instance => ({ instance, _module, metadata })))
18+
.flatMap(({ instance, _module, metadata }) => {
19+
const exchange = new ExchangeManager(connection.defaultChannel, new ExchangeWrapper(instance, metadata.data));
20+
return exchange.assert();
21+
})
22+
.toArray();
23+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import { CoreModule, DependencyInjection, extractMetadataByDecorator, Type } from '@hapiness/core';
2+
import { ConnectionManager } from '../managers/connection-manager';
3+
import { Observable } from 'rxjs/Observable';
4+
import { metadataFromDeclarations } from '../utils';
5+
import { QueueDecoratorInterface, ExchangeDecoratorInterface } from '../decorators';
6+
import { getChannel } from './get-channel';
7+
import { QueueManager } from '../managers/queue-manager';
8+
import { QueueWrapper } from '../managers/queue-wrapper';
9+
import registerMessages from './register-messages';
10+
import { MessageRouterInterface } from '../interfaces/message-router';
11+
import { consumeQueue } from './consume-queue';
12+
13+
export default function buildQueues(
14+
modules: CoreModule[], connection: ConnectionManager, MessageRouter: Type<MessageRouterInterface>
15+
): Observable<any> {
16+
return Observable.from(modules)
17+
.filter(_module => !!_module)
18+
.flatMap(_module =>
19+
metadataFromDeclarations<QueueDecoratorInterface>(_module.declarations, 'Queue')
20+
.map(metadata => ({ metadata, _module }))
21+
)
22+
.flatMap(({ metadata, _module }) =>
23+
DependencyInjection.instantiateComponent(metadata.token, _module.di)
24+
.map(instance => ({ instance, _module, metadata})))
25+
// Assert queue
26+
.mergeMap(({ instance, _module, metadata }) =>
27+
metadata.data.channel ?
28+
getChannel(connection, metadata.data.channel)
29+
.map(channel => ({ instance, metadata, channel, _module })) :
30+
Observable.of({ instance, metadata, channel: connection.defaultChannelManager, _module }))
31+
.mergeMap(({ instance, metadata, channel, _module }) => {
32+
const queue = new QueueManager(channel, new QueueWrapper(instance, metadata.data));
33+
return Observable.forkJoin(queue.assert(), Observable.of(metadata), Observable.of(_module));
34+
})
35+
// Bind queue
36+
.mergeMap(([queue, metadata, _module]) => {
37+
if (Array.isArray(metadata.data.binds)) {
38+
return Observable.forkJoin(
39+
metadata.data.binds.map(bind => {
40+
if (Array.isArray(bind.pattern)) {
41+
return Observable.forkJoin(bind.pattern.map(pattern => queue.bind(
42+
extractMetadataByDecorator<ExchangeDecoratorInterface>(bind.exchange, 'Exchange').name, pattern)));
43+
}
44+
45+
return queue.bind(
46+
extractMetadataByDecorator<ExchangeDecoratorInterface>(bind.exchange, 'Exchange').name, bind.pattern
47+
);
48+
})).map(() => ({ queue, _module }));
49+
}
50+
51+
return Observable.of(({ queue, _module }));
52+
})
53+
// Register messages related to queue
54+
// Consume queue
55+
// Dont consume queue if there are no messages or consume() method on queue
56+
.mergeMap(({ queue, _module }) => {
57+
const messageRouter = new MessageRouter();
58+
return registerMessages(modules, queue, messageRouter)
59+
.defaultIfEmpty(null)
60+
.filter(item => !!item)
61+
.toArray()
62+
.switchMap((registeredMessages) => {
63+
const _queue = queue.getQueue();
64+
if (registeredMessages.length || typeof _queue['onMessage'] === 'function') {
65+
return consumeQueue(queue, messageRouter);
66+
}
67+
68+
return Observable.of(null);
69+
});
70+
})
71+
.toArray();
72+
}

0 commit comments

Comments
 (0)