[Bps-public-commit] anyevent-rabbitmq-rpc branch, master, updated. 0.5-3-g6d9ea23

Alex Vandiver alexmv at bestpractical.com
Tue Oct 1 15:57:08 EDT 2013


The branch, master has been updated
       via  6d9ea23172d0aff0060798a9761ccd6899c8f0cd (commit)
       via  6ef4a2f1a46640a696acacb260d1f02ec0c55722 (commit)
      from  87b33093bf9f67bd8fc570df0634cb45cad7bfa8 (commit)

Summary of changes:
 lib/AnyEvent/RabbitMQ/RPC.pm | 82 +++++++++++++++++++++++++++++---------------
 1 file changed, 54 insertions(+), 28 deletions(-)

- Log -----------------------------------------------------------------
commit 6ef4a2f1a46640a696acacb260d1f02ec0c55722
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Tue Oct 1 15:56:42 2013 -0400

    Add a callback-based register_async

diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index c573139..472645c 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -127,6 +127,27 @@ sub register {
         @_
     );
 
+    my $run = delete $args{run};
+
+    $self->register_async(
+        run => sub {
+            my %args = @_;
+            $args{on_success}->( $run->( $args{args} ) );
+        },
+        %args,
+    );
+}
+
+
+sub register_async {
+    my $self = shift;
+    my %args = (
+        name => undef,
+        run  => sub {},
+        on_failure => sub { warn "Failure: @_" },
+        @_
+    );
+
     # Ensure we have the queue
     $self->rpc_queue(
         queue      => $args{name},
@@ -149,38 +170,43 @@ sub register {
                         return if $failed;
                     }
 
-                    # Call the sub
-                    my $return;
+                    my $done = sub { $self->channel->ack };
+                    if (my $reply_to = $frame->{header}->reply_to) {
+                        $done = sub {
+                            my ($return) = @_;
+                            if ($self->{serialize}) {
+                                try {
+                                    $return = $self->{serialize}->($return);
+                                } catch {
+                                    $failed = 1;
+                                    $args{on_failure}->("Serialization failed: $_");
+                                };
+                                return if $failed;
+                            }
+
+                            $return = "0E0" if not $return;
+                            $self->channel->publish(
+                                exchange => '',
+                                routing_key => $reply_to,
+                                body => $return,
+                            );
+                            $self->channel->ack;
+                        };
+                    }
+
                     try {
-                        $return = $args{run}->( $args );
+                        $args{run}->(
+                            args => $args,
+                            on_failure => $args{on_failure},
+                            on_success => sub {
+                                $done->( @_ );
+                            },
+                        );
                     } catch {
                         $failed = 1;
-                        $args{on_failure}->("Call died: $_");
+                        $args{on_failure}->("Call failed: $_");
                     };
                     return if $failed;
-
-                    # Send the response, if they asked for it
-                    if (my $reply_to = $frame->{header}->reply_to) {
-                        if ($self->{serialize}) {
-                            try {
-                                $return = $self->{serialize}->($return);
-                            } catch {
-                                $failed = 1;
-                                $args{on_failure}->("Serialization failed: $_");
-                            };
-                            return if $failed;
-                        }
-
-                        $return = "0E0" if not $return;
-                        $self->channel->publish(
-                            exchange => '',
-                            routing_key => $reply_to,
-                            body => $return,
-                        );
-                    }
-
-                    # And finally mark the task as complete
-                    $self->channel->ack;
                 },
                 on_failure => $args{on_failure},
             );

commit 6d9ea23172d0aff0060798a9761ccd6899c8f0cd
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Tue Oct 1 15:57:06 2013 -0400

    Set up a heartbeat and drop the max frame size

diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 472645c..4505486 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -38,7 +38,7 @@ sub new {
         $channel->();
     } else {
         AnyEvent::RabbitMQ->load_xml_spec;
-        $self->{connection} = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
+        $self->{connection} = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0, tune => {heartbeat => 30, frame_max => 0});
         $self->connection->connect(
             %args,
             on_success => $channel,

-----------------------------------------------------------------------



More information about the Bps-public-commit mailing list