[Bps-public-commit] anyevent-rabbitmq-rpc branch, master, created. 811ef13cd1f69eb43eb551262964e5105f9e0f00

Alex Vandiver alexmv at bestpractical.com
Sun Jul 8 23:07:37 EDT 2012


The branch, master has been created
        at  811ef13cd1f69eb43eb551262964e5105f9e0f00 (commit)

- Log -----------------------------------------------------------------
commit e570a997406df06bb6100bbcdde9abddea8090b3
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Thu Mar 22 03:04:18 2012 -0400

    Initial implementation

diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
new file mode 100644
index 0000000..7f6d8ef
--- /dev/null
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -0,0 +1,247 @@
+package AnyEvent::RabbitMQ::RPC;
+
+use strict;
+use warnings;
+
+use AnyEvent::RabbitMQ;
+
+sub new {
+    my $class = shift;
+    my %args = @_;
+
+    my $self = bless {}, $class;
+
+    my $cv = AE::cv;
+
+    my $amqp = $args{connection};
+    my $channel = sub {
+        $amqp->open_channel(
+            on_success => sub {
+                $self->{channel} = shift;
+                $self->{channel}->qos;
+                $cv->send($self);
+            },
+            on_failure => sub {
+                warn "Channel failed: @_";
+                $cv->send();
+            }
+        );
+    };
+    if ($amqp) {
+        $channel->();
+    } else {
+        AnyEvent::RabbitMQ->load_xml_spec;
+        $amqp = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
+        $amqp->connect(
+            %args,
+            on_success => $channel,
+            on_failure => sub {
+                warn "Connect failed: @_";
+                $cv->send();
+            }
+        );
+    }
+
+    $args{serialize} ||= '';
+    if ($args{serialize} eq "YAML") {
+        require YAML::Any;
+        $self->{serialize}   = \&YAML::Any::Dump;
+        $self->{unserialize} = \&YAML::Any::Load;
+    } elsif ($args{serialize} eq "JSON") {
+        require JSON::Any;
+        JSON::Any->import;
+        my $json = JSON::Any->new;
+        $self->{serialize}   = sub { $json->objToJson( [@_] ) };
+        $self->{unserialize} = sub { (@{ $json->jsonToObj(@_) })[0] };
+    } elsif ($args{serialize} eq "Storable") {
+        require Storable;
+        $self->{serialize}   = sub { Storable::nfreeze( [@_] )};
+        $self->{unserialize} = sub { (@{ Storable::thaw(@_) })[0] };
+    }
+
+    # Block on having set up the channel
+    return $cv->recv;
+}
+
+sub channel {
+    my $self = shift;
+    return $self->{channel};
+}
+
+sub rpc_queue {
+    my $self = shift;
+    my %args = @_;
+
+    # These queues are durable -- as such, we should only need to check
+    # that they are there once per process.
+    return $args{on_success}->()
+        if $self->{queues}{$args{queue}};
+
+    $self->channel->declare_queue(
+        no_ack     => 0,
+        durable    => 1,
+        exclusive  => 0,
+        %args,
+        on_success => sub {
+            $self->{queues}{$args{queue}}++;
+            $args{on_success}->();
+        },
+    );
+}
+
+sub reply_queue {
+    my $self = shift;
+    my %args = @_;
+
+    $self->channel->declare_queue(
+        no_ack     => 1,
+        durable    => 0,
+        exclusive  => 1,
+        on_success => sub {
+            $args{on_success}->(shift->method_frame->queue);
+        },
+        on_failure => $args{on_failure},
+    );
+}
+
+sub register {
+    my $self = shift;
+    my %args = (
+        name => undef,
+        run  => sub {},
+        on_failure => sub { warn "Failure: @_" },
+        @_
+    );
+
+    # Ensure we have the queue
+    $self->rpc_queue(
+        queue      => $args{name},
+        on_success => sub {
+            # And set up a listen on it
+            $self->channel->consume(
+                queue      => $args{name},
+                no_ack     => 0,
+                on_consume => sub {
+                    my $frame = shift;
+                    my $args = $frame->{body}->payload;
+                    $args = $self->{unserialize}->($args)
+                        if $self->{unserialize};
+
+                    # Call the sub
+                    my $return = $args{run}->( $args );
+
+                    # Send the response, if they asked for it
+                    if (my $reply_to = $frame->{header}->reply_to) {
+                        $return = $self->{serialize}->($return)
+                            if $self->{serialize};
+                        $return = "0E0" if not $return;
+                        $self->channel->publish(
+                            exchange => '',
+                            routing_key => $reply_to,
+                            body => $return,
+                        );
+                    }
+
+                    # And finally mark the task as complete
+                    $self->channel->ack;
+                },
+                on_failure => $args{on_failure},
+            );
+        },
+        on_failure => $args{on_failure},
+    );
+}
+
+sub call {
+    my $self = shift;
+
+    my %args = (
+        name => undef,
+        args => undef,
+        on_sent => undef,
+        on_failure => sub { warn "Failure: @_" },
+        @_
+    );
+
+    my $finished;
+    if (defined wantarray and not $args{on_reply}) {
+        # We we're called in a not-void context, and without a reply
+        # callback, assume this is a syncronous call, and set up
+        # $finished to block on the reply
+        $args{on_reply} = $finished = AE::cv;
+        my $fail = $args{on_failure};
+        $args{on_failure} = sub {
+            $fail->(@_) if $fail;
+            $finished->send(undef);
+        }
+    }
+
+    my $sent_failure = $args{on_sent} ? sub {
+        $args{on_sent}->send(0);
+        $args{on_failure}->(@_);
+    } : $args{on_failure};
+
+    my $send; $send = sub {
+        my $REPLIES = shift;
+        my $args = $args{args};
+        $args = $self->{serialize}->($args)
+            if $self->{serialize};
+        $args = "0E0" if not $args;
+        $self->channel->publish(
+            exchange    => '',
+            routing_key => $args{name},
+            body        => $args,
+            header => {
+                ($REPLIES ? (reply_to => $REPLIES) : ()),
+                delivery_mode => 2, # Persistent storage
+            },
+        );
+        $args{on_sent}->send(1) if $args{on_sent};
+    };
+
+    unless ($args{on_reply}) {
+        # Fire and forget
+        $self->rpc_queue(
+            queue      => $args{name},
+            on_success => sub { $send->(undef) },
+            on_failure => $sent_failure,
+        );
+        return;
+    }
+
+    # We need to set up an ephemeral reply queue
+    $self->rpc_queue(
+        queue      => $args{name},
+        on_success => sub {
+            $self->reply_queue(
+                on_success => sub {
+                    my $REPLIES = shift;
+                    $self->channel->consume(
+                        queue => $REPLIES,
+                        no_ack => 1,
+                        on_consume => sub {
+                            my $frame = shift;
+                            # We got a reply, tear down our reply queue
+                            $self->channel->delete_queue(
+                                queue => $REPLIES,
+                            );
+                            my $return = $frame->{body}->payload;
+                            $return = $self->{unserialize}->($return)
+                                if $self->{unserialize};
+                            $args{on_reply}->($return);
+                        },
+                        on_success => sub { $send->($REPLIES) },
+                        on_failure => $sent_failure,
+                    );
+                },
+                on_failure => $sent_failure,
+            );
+        },
+        on_failure => $sent_failure,
+    );
+
+    return $finished->recv if $finished;
+    return 1;
+}
+
+1;

commit 7c3fa70e70bff4953963a8fbec11d1248ea9f4b7
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Mon Mar 26 01:11:41 2012 -0400

    Use Try::Tiny to protect the (un)serialization and RPC call

diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 7f6d8ef..7ac2e31 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -4,6 +4,7 @@ use strict;
 use warnings;
 
 use AnyEvent::RabbitMQ;
+use Try::Tiny;
 
 sub new {
     my $class = shift;
@@ -123,17 +124,40 @@ sub register {
                 no_ack     => 0,
                 on_consume => sub {
                     my $frame = shift;
+                    my $failed;
                     my $args = $frame->{body}->payload;
-                    $args = $self->{unserialize}->($args)
-                        if $self->{unserialize};
+                    if ($self->{unserialize}) {
+                        try {
+                            $args = $self->{unserialize}->($args);
+                        } catch {
+                            $failed = 1;
+                            $args{on_failure}->("Unserialization failed: $_");
+                        };
+                        return if $failed;
+                    }
 
                     # Call the sub
-                    my $return = $args{run}->( $args );
+                    my $return;
+                    try {
+                        $return = $args{run}->( $args );
+                    } catch {
+                        $failed = 1;
+                        $args{on_failure}->("Call died: $_");
+                    };
+                    return if $failed;
 
                     # Send the response, if they asked for it
                     if (my $reply_to = $frame->{header}->reply_to) {
-                        $return = $self->{serialize}->($return)
-                            if $self->{serialize};
+                        if ($self->{serialize}) {
+                            try {
+                                $return = $self->{serialize}->($return);
+                            } catch {
+                                $failed = 1;
+                                $args{on_failure}->("Serialization failed: $_");
+                            };
+                            return if $failed;
+                        }
+
                         $return = "0E0" if not $return;
                         $self->channel->publish(
                             exchange => '',
@@ -184,8 +208,16 @@ sub call {
     my $send; $send = sub {
         my $REPLIES = shift;
         my $args = $args{args};
-        $args = $self->{serialize}->($args)
-            if $self->{serialize};
+        if ($self->{serialize}) {
+            my $failed;
+            try {
+                $args = $self->{serialize}->($args);
+            } catch {
+                $failed = 1;
+                $args{on_failure}->("Serialization failed: $_");
+            };
+            return if $failed;
+        }
         $args = "0E0" if not $args;
         $self->channel->publish(
             exchange    => '',
@@ -226,8 +258,16 @@ sub call {
                                 queue => $REPLIES,
                             );
                             my $return = $frame->{body}->payload;
-                            $return = $self->{unserialize}->($return)
-                                if $self->{unserialize};
+                            if ($self->{unserialize}) {
+                                my $failed;
+                                try {
+                                    $return = $self->{unserialize}->($return);
+                                } catch {
+                                    $args{on_failure}->("Unserialization failed: $_");
+                                    $failed = 1;
+                                };
+                                return if $failed;
+                            }
                             $args{on_reply}->($return);
                         },
                         on_success => sub { $send->($REPLIES) },

commit 07f1536aa81fb2d63e6715fc60202e7191138303
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Mon Mar 26 01:12:45 2012 -0400

    Make the connection available to other sources via a method

diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 7ac2e31..60128a5 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -14,9 +14,9 @@ sub new {
 
     my $cv = AE::cv;
 
-    my $amqp = $args{connection};
+    $self->{connection} = $args{connection};
     my $channel = sub {
-        $amqp->open_channel(
+        $self->connection->open_channel(
             on_success => sub {
                 $self->{channel} = shift;
                 $self->{channel}->qos;
@@ -28,12 +28,12 @@ sub new {
             }
         );
     };
-    if ($amqp) {
+    if ($self->connection) {
         $channel->();
     } else {
         AnyEvent::RabbitMQ->load_xml_spec;
-        $amqp = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
-        $amqp->connect(
+        $self->{connection} = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
+        $self->connection->connect(
             %args,
             on_success => $channel,
             on_failure => sub {
@@ -64,6 +64,11 @@ sub new {
     return $cv->recv;
 }
 
+sub connection {
+    my $self = shift;
+    return $self->{connection};
+}
+
 sub channel {
     my $self = shift;
     return $self->{channel};

commit fb8f0f3fd55603d36fdaec12eab6a5aca412c04c
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Mon Mar 26 01:13:17 2012 -0400

    Allow a more callback-oriented approach to ->new

diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 60128a5..d007244 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -13,6 +13,8 @@ sub new {
     my $self = bless {}, $class;
 
     my $cv = AE::cv;
+    my $success = $args{on_success} || $cv;
+    my $failure = $args{on_failure} || $cv;
 
     $self->{connection} = $args{connection};
     my $channel = sub {
@@ -20,11 +22,11 @@ sub new {
             on_success => sub {
                 $self->{channel} = shift;
                 $self->{channel}->qos;
-                $cv->send($self);
+                $success->($self);
             },
             on_failure => sub {
                 warn "Channel failed: @_";
-                $cv->send();
+                $failure->();
             }
         );
     };
@@ -38,7 +40,7 @@ sub new {
             on_success => $channel,
             on_failure => sub {
                 warn "Connect failed: @_";
-                $cv->send();
+                $failure->();
             }
         );
     }
@@ -60,7 +62,10 @@ sub new {
         $self->{unserialize} = sub { (@{ Storable::thaw(@_) })[0] };
     }
 
-    # Block on having set up the channel
+    # If they have a callback waiting for them, bail now
+    return if $args{on_success};
+
+    # Otherwise, block on having set up the channel
     return $cv->recv;
 }
 

commit 811ef13cd1f69eb43eb551262964e5105f9e0f00
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Mon Mar 26 01:14:21 2012 -0400

    Use $callback->() instead of $callback->send() so they work with subs, not just condvars

diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index d007244..1991d7e 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -211,7 +211,7 @@ sub call {
     }
 
     my $sent_failure = $args{on_sent} ? sub {
-        $args{on_sent}->send(0);
+        $args{on_sent}->(0);
         $args{on_failure}->(@_);
     } : $args{on_failure};
 
@@ -238,7 +238,7 @@ sub call {
                 delivery_mode => 2, # Persistent storage
             },
         );
-        $args{on_sent}->send(1) if $args{on_sent};
+        $args{on_sent}->(1) if $args{on_sent};
     };
 
     unless ($args{on_reply}) {

-----------------------------------------------------------------------



More information about the Bps-public-commit mailing list