Skip to content

Commit b0051eb

Browse files
authored
Merge pull request #51 from hapinessjs/next
Next
2 parents cf04acc + 8aa9121 commit b0051eb

File tree

15 files changed

+186
-108
lines changed

15 files changed

+186
-108
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,10 @@ To set up your development environment:
453453
[Back to top](#table-of-contents)
454454

455455
## Change History
456+
* v1.7.0 (2019-02-27)
457+
* Add method to cancel consuming queue
458+
* Refactor consume queue to allow easier consume/cancel
459+
* Add a QueueStore to fetch all the queues manager instances
456460
* v1.6.2 (2018-11-22)
457461
* Create DI with providers for queues and exchanges
458462
* v1.6.1 (2018-11-14)

package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@hapiness/rabbitmq",
3-
"version": "1.6.2",
3+
"version": "1.7.0",
44
"description": "Hapiness module for rabbitmq",
55
"main": "commonjs/index.js",
66
"types": "index.d.ts",

src/module/extension/build-queues.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import registerMessages from './register-messages';
1010
import { MessageRouterInterface } from '../interfaces/message-router';
1111
import { consumeQueue } from './consume-queue';
1212
import { RabbitMQExt } from '../rabbitmq.extension';
13+
import { QueueStore } from '../managers/queue-store';
1314

1415
export default function buildQueues(
1516
modules: CoreModule[], connection: ConnectionManager, MessageRouter: Type<MessageRouterInterface>
@@ -30,7 +31,8 @@ export default function buildQueues(
3031
.map(channel => ({ instance, metadata, channel, _module })) :
3132
Observable.of({ instance, metadata, channel: connection.defaultChannelManager, _module }))
3233
.flatMap(({ instance, metadata, channel, _module }) => {
33-
const queue = new QueueManager(channel, new QueueWrapper(instance, metadata.data));
34+
const queue = new QueueManager(channel, new QueueWrapper(instance, metadata));
35+
QueueStore.getInstance().addQueue(queue);
3436
const shouldAssert = typeof metadata.data.assert === 'boolean' ? metadata.data.assert : RabbitMQExt.getConfig().assert;
3537
// Don't check queue if we assert it
3638
const assertOrCheck$ = shouldAssert

src/module/extension/consume-queue.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ const debug = require('debug')('hapiness:rabbitmq');
77

88
export function consumeQueue(queue: QueueManager, messageRouter: MessageRouterInterface): Observable<any> {
99
debug(`Creating dispatcher for queue ${queue.getName()}`);
10-
return queue.consume(
11-
(ch, message) => messageRouter.getDispatcher(ch, message))
10+
queue.setDispatcher((ch, message, params) => messageRouter.getDispatcher(ch, message, params));
11+
return queue.consume()
1212
.catch(err => Observable.of(errorHandler(err)))
1313
.do(() => debug('consumed'));
1414
}

src/module/interfaces/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export * from './message-result';
1010
export * from './message-router';
1111
export * from './message';
1212
export * from './on-asserted';
13+
export * from './queue-dispatcher-options';
1314
export * from './queue-options';
1415
export * from './queue';
1516
export * from './rabbit-message';

src/module/interfaces/message-router.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { RabbitMessage } from './rabbit-message';
44
import { MessageResult } from './message-result';
55
import { CoreModule } from '@hapiness/core';
66
import { MessageDecoratorInterface } from '../decorators';
7+
import { QueueDispatcherOptions } from './queue-dispatcher-options';
78

89
export interface RegisterMessageOptions {
910
token: any;
@@ -13,5 +14,6 @@ export interface RegisterMessageOptions {
1314

1415
export interface MessageRouterInterface {
1516
registerMessage({ token, module }: RegisterMessageOptions): Observable<any>;
16-
getDispatcher(ch: ChannelInterface, message: RabbitMessage): Observable<() => Observable<MessageResult>>;
17+
getDispatcher(ch: ChannelInterface, message: RabbitMessage, { queue }: QueueDispatcherOptions):
18+
Observable<() => Observable<MessageResult>>;
1719
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import { QueueWrapper } from '../managers';
2+
3+
export interface QueueDispatcherOptions {
4+
queue: QueueWrapper
5+
};

src/module/managers/channel-manager.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export class ChannelManager extends EventEmitter {
2525
this._connectionManager.on('error', () => {
2626
this._isConnected = false;
2727
});
28+
this.setMaxListeners(0);
2829
}
2930

3031
get prefetch(): number {

src/module/managers/index.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
export * from './channel-manager';
2-
export * from './channel-store';
3-
export * from './connection-manager';
4-
export * from './exchange-manager';
5-
export * from './exchange-wrapper';
6-
export * from './message-store';
7-
export * from './queue-manager';
8-
export * from './queue-wrapper';
1+
export * from './channel-manager';
2+
export * from './channel-store';
3+
export * from './connection-manager';
4+
export * from './exchange-manager';
5+
export * from './exchange-wrapper';
6+
export * from './message-store';
7+
export * from './queue-manager';
8+
export * from './queue-store';
9+
export * from './queue-wrapper';

0 commit comments

Comments
 (0)