[Bps-public-commit] anyevent-rabbitmq-rpc branch, master, updated. 0.5-3-g6d9ea23
Alex Vandiver
alexmv at bestpractical.com
Tue Oct 1 15:57:08 EDT 2013
The branch, master has been updated
via 6d9ea23172d0aff0060798a9761ccd6899c8f0cd (commit)
via 6ef4a2f1a46640a696acacb260d1f02ec0c55722 (commit)
from 87b33093bf9f67bd8fc570df0634cb45cad7bfa8 (commit)
Summary of changes:
lib/AnyEvent/RabbitMQ/RPC.pm | 82 +++++++++++++++++++++++++++++---------------
1 file changed, 54 insertions(+), 28 deletions(-)
- Log -----------------------------------------------------------------
commit 6ef4a2f1a46640a696acacb260d1f02ec0c55722
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Tue Oct 1 15:56:42 2013 -0400
Add a callback-based register_async
diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index c573139..472645c 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -127,6 +127,27 @@ sub register {
@_
);
+ my $run = delete $args{run};
+
+ $self->register_async(
+ run => sub {
+ my %args = @_;
+ $args{on_success}->( $run->( $args{args} ) );
+ },
+ %args,
+ );
+}
+
+
+sub register_async {
+ my $self = shift;
+ my %args = (
+ name => undef,
+ run => sub {},
+ on_failure => sub { warn "Failure: @_" },
+ @_
+ );
+
# Ensure we have the queue
$self->rpc_queue(
queue => $args{name},
@@ -149,38 +170,43 @@ sub register {
return if $failed;
}
- # Call the sub
- my $return;
+ my $done = sub { $self->channel->ack };
+ if (my $reply_to = $frame->{header}->reply_to) {
+ $done = sub {
+ my ($return) = @_;
+ if ($self->{serialize}) {
+ try {
+ $return = $self->{serialize}->($return);
+ } catch {
+ $failed = 1;
+ $args{on_failure}->("Serialization failed: $_");
+ };
+ return if $failed;
+ }
+
+ $return = "0E0" if not $return;
+ $self->channel->publish(
+ exchange => '',
+ routing_key => $reply_to,
+ body => $return,
+ );
+ $self->channel->ack;
+ };
+ }
+
try {
- $return = $args{run}->( $args );
+ $args{run}->(
+ args => $args,
+ on_failure => $args{on_failure},
+ on_success => sub {
+ $done->( @_ );
+ },
+ );
} catch {
$failed = 1;
- $args{on_failure}->("Call died: $_");
+ $args{on_failure}->("Call failed: $_");
};
return if $failed;
-
- # Send the response, if they asked for it
- if (my $reply_to = $frame->{header}->reply_to) {
- if ($self->{serialize}) {
- try {
- $return = $self->{serialize}->($return);
- } catch {
- $failed = 1;
- $args{on_failure}->("Serialization failed: $_");
- };
- return if $failed;
- }
-
- $return = "0E0" if not $return;
- $self->channel->publish(
- exchange => '',
- routing_key => $reply_to,
- body => $return,
- );
- }
-
- # And finally mark the task as complete
- $self->channel->ack;
},
on_failure => $args{on_failure},
);
commit 6d9ea23172d0aff0060798a9761ccd6899c8f0cd
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Tue Oct 1 15:57:06 2013 -0400
Set up a heartbeat and drop the max frame size
diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 472645c..4505486 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -38,7 +38,7 @@ sub new {
$channel->();
} else {
AnyEvent::RabbitMQ->load_xml_spec;
- $self->{connection} = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
+ $self->{connection} = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0, tune => {heartbeat => 30, frame_max => 0});
$self->connection->connect(
%args,
on_success => $channel,
-----------------------------------------------------------------------
More information about the Bps-public-commit
mailing list