[Bps-public-commit] anyevent-rabbitmq-rpc branch, master, updated. 5aaa332ae613446ca18d6f9bf989526fbfe6c867

Alex Vandiver alexmv at bestpractical.com
Sun Nov 4 16:38:34 EST 2012


The branch, master has been updated
       via  5aaa332ae613446ca18d6f9bf989526fbfe6c867 (commit)
       via  a10d8002bcf741fe327946ea067b212c2d87ab97 (commit)
       via  33835a020cc28629f8d66dce6576bb22fe55928a (commit)
      from  811ef13cd1f69eb43eb551262964e5105f9e0f00 (commit)

Summary of changes:
 lib/AnyEvent/RabbitMQ/RPC.pm | 200 +++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 194 insertions(+), 6 deletions(-)

- Log -----------------------------------------------------------------
commit 33835a020cc28629f8d66dce6576bb22fe55928a
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Sat Nov 3 19:52:38 2012 -0400

    Factor warnings-on-failure out

diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 1991d7e..693d49d 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -14,7 +14,10 @@ sub new {
 
     my $cv = AE::cv;
     my $success = $args{on_success} || $cv;
-    my $failure = $args{on_failure} || $cv;
+    my $failure = $args{on_failure} || sub {
+        warn "@_";
+        $cv->(undef);
+    };
 
     $self->{connection} = $args{connection};
     my $channel = sub {
@@ -25,8 +28,7 @@ sub new {
                 $success->($self);
             },
             on_failure => sub {
-                warn "Channel failed: @_";
-                $failure->();
+                $failure->("Channel failed: @_");
             }
         );
     };
@@ -39,8 +41,7 @@ sub new {
             %args,
             on_success => $channel,
             on_failure => sub {
-                warn "Connect failed: @_";
-                $failure->();
+                $failure->("Connect failed: @_");
             }
         );
     }
@@ -193,7 +194,7 @@ sub call {
         name => undef,
         args => undef,
         on_sent => undef,
-        on_failure => sub { warn "Failure: @_" },
+        on_failure => sub { warn "RPC Failure: @_" },
         @_
     );
 

commit a10d8002bcf741fe327946ea067b212c2d87ab97
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Sat Nov 3 19:56:08 2012 -0400

    Add POD

diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 693d49d..664cb81 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -296,3 +296,188 @@ sub call {
 }
 
 1;
+
+__END__
+
+=head1 NAME
+
+AnyEvent::RabbitMQ::RPC - RPC queues via RabbitMQ
+
+=head1 SYNOPSIS
+
+    use AnyEvent::RabbitMQ::RPC;
+
+    my $rpc = AnyEvent::RabbitMQ::RPC->new(
+        host   => 'localhost',
+        port   => 5672,
+        user   => 'guest',
+        pass   => 'guest',
+        vhost  => '/',
+        serialize => 'Storable',
+    );
+
+    print $rpc->call(
+        name => 'MethodName',
+        args => { some => "data" },
+    );
+
+=head1 DESCRIPTION
+
+C<AnyEvent::RabbitMQ::RPC> provides an AnyEvent-based reliable job queue
+atop the RabbitMQ event server.  This can be used as a replacement for
+similar reliable job queue/RPC client-worker models, such as
+L<TheSchwartz>.
+
+RPC classes can L<register> calls that they can handle, and/or use
+L<call> to request another client perform work.
+
+=head1 METHODS
+
+=head2 new
+
+Create a new RPC object.  Either an existing L<AnyEvent::RabbitMQ>
+object can be passed using the C<connection> argument, or the all of the
+provided parameters will be passed through to
+L<AnyEvent::RabbitMQ/connect> on a new object.  In the latter case,
+common parameters include C<host>, C<port>, C<user>, C<pass>, and
+C<vhost>.
+
+If you wish to pass complex data structures back and forth to remote
+workers, a value must be passed for C<serialize>.  Both worker and
+client must be configured to use the same serialization method.  The
+available options are:
+
+=over
+
+=item YAML
+
+Use L<YAML::Any/Dump> and L<YAML::Any/Load> to serialize and deserialize
+data.
+
+=item JSON
+
+Use L<JSON::Any/objToJson> and L<JSON::Any/jsonToObj> to serialize and
+deserialize.
+
+=item Storable
+
+Use L<Storable/nfreeze> and L<Storable/thaw> to serialize and
+deserialize.
+
+=back
+
+Two callback points, C<on_success> and C<on_failure>, are provided.
+C<on_success> will be passed the initialized L<AnyEvent::RabbitMQ::RPC>
+object; C<on_failure> will be passed the reason for the failure.  If no
+C<on_success> is provided, this call will block using an
+L<AnyEvent::CondVar> until the connection is established.
+
+
+=head2 register name => C<STRING>, run => C<SUBREF>
+
+Establishes that the current process knows how to run the job named
+C<STRING>, whose definition is provided by the surboutine C<SUBREF>.
+The subroutine will be called whenever a job is removed from the queue;
+it will be called with the argument passed to C</call>, which may be
+more than a string if C<serialize> was set during L</new>.
+
+Due to a limitation of C<AnyEvent::RabbitMQ>, false values returned by
+the subroutine are transformed into the true-valued string C<0E0>.
+Subroutines which fail to execute to completion (via C<die> or other
+runtime execution failure) will re-insert the job into the queue for the
+next worker to process.
+
+Returning non-string values requires that both worker and client have
+been created with the same (non-empty) value of C<serialize> passed to
+L</new>.
+
+A callback C<on_failure> may optionally be passed, which will be called
+with an error message if suitable channels cannoot be configured on the
+RabbitMQ server.
+
+=head2 call name => C<STRING>, args => C<VALUE>
+
+Submits a job to the job queue.  The C<VALUE> provided must be a string,
+unless C<serialize> was passed to L</new>.
+
+Three callbacks exist:
+
+=over
+
+=item on_reply
+
+Called when the job has been completed successfully, and will be passed
+the return value of the job.  Returning non-string values requires that
+both worker and client have been created with the same (non-empty) value
+of C<serialize> passed to L</new>.
+
+=item on_sent
+
+Called once the job has been submitted, with a true value if the job was
+submitted successfully.  A false value will be passed if the requisite
+channels could not be configured, and the job was not submitted
+sucessfully.
+
+=item on_failure
+
+Called if there was an error submitting the job, and is passed the
+reason for the failure.  If C<on_sent> was also provided, this is
+I<also> called after C<on_sent> is called with a false value.
+
+=back
+
+
+If no value for C<on_reply> is provided, and the C<call> function is not
+in void context, a C<AnyEvent::CondVar> is used to automatically block
+until a reply is received from a worker; the return value of the reply
+is then returned from L</call>.
+
+
+=head2 connection
+
+Returns the L<AnyEvent::RabbitMQ> connection used by this object.
+
+=head2 channel
+
+Returns the L<AnyEvent::RabbitMQ::Channel> used by this object.
+
+=head2 rpc_queue queue => C<NAME>, on_success => C<CALLBACK>
+
+Creates the queue with the given name, used to schedule jobs.  These
+queues are durable, and thus persist across program invocations and
+RabbitMQ restarts.
+
+The C<on_success> callback is called once the queue is known to exist.
+The C<on_failure> may alternately be called with a reason if the queue
+creation fails.
+
+=head2 reply_queue on_success => C<CALLBACK>
+
+Creates a temporary queue used to reply to job requests.  These queues
+are anonymous and ephemeral, and are torn down after each RPC call.
+
+The C<on_success> callback is called with the name of the queue that has
+been created.  The C<on_failure> may alternately be called with a reason
+if the queue creation fails.
+
+
+=head1 AUTHOR
+
+Alex Vandiver C<< <alexmv at bestpractical.com> >>
+
+=head1 BUGS
+
+All bugs should be reported via
+L<http://rt.cpan.org/Public/Dist/Display.html?Name=AnyEvent-RabbitMQ-RPC>
+or L<bug-AnyEvent-RabbitMQ-RPC at rt.cpan.org>.
+
+
+=head1 LICENSE AND COPYRIGHT
+
+This software is Copyright (c) 2012 by Best Practical Solutions
+
+This is free software, licensed under:
+
+  The GNU General Public License, Version 2, June 1991
+
+=cut

commit 5aaa332ae613446ca18d6f9bf989526fbfe6c867
Author: Alex Vandiver <alexmv at bestpractical.com>
Date:   Sun Nov 4 16:33:41 2012 -0500

    Add version 0.5

diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 664cb81..c573139 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -6,6 +6,8 @@ use warnings;
 use AnyEvent::RabbitMQ;
 use Try::Tiny;
 
+our $VERSION = '0.5';
+
 sub new {
     my $class = shift;
     my %args = @_;

-----------------------------------------------------------------------



More information about the Bps-public-commit mailing list