[Bps-public-commit] anyevent-rabbitmq-rpc branch, master, updated. 5aaa332ae613446ca18d6f9bf989526fbfe6c867
Alex Vandiver
alexmv at bestpractical.com
Sun Nov 4 16:38:34 EST 2012
The branch, master has been updated
via 5aaa332ae613446ca18d6f9bf989526fbfe6c867 (commit)
via a10d8002bcf741fe327946ea067b212c2d87ab97 (commit)
via 33835a020cc28629f8d66dce6576bb22fe55928a (commit)
from 811ef13cd1f69eb43eb551262964e5105f9e0f00 (commit)
Summary of changes:
lib/AnyEvent/RabbitMQ/RPC.pm | 200 +++++++++++++++++++++++++++++++++++++++++--
1 file changed, 194 insertions(+), 6 deletions(-)
- Log -----------------------------------------------------------------
commit 33835a020cc28629f8d66dce6576bb22fe55928a
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Sat Nov 3 19:52:38 2012 -0400
Factor warnings-on-failure out
diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 1991d7e..693d49d 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -14,7 +14,10 @@ sub new {
my $cv = AE::cv;
my $success = $args{on_success} || $cv;
- my $failure = $args{on_failure} || $cv;
+ my $failure = $args{on_failure} || sub {
+ warn "@_";
+ $cv->(undef);
+ };
$self->{connection} = $args{connection};
my $channel = sub {
@@ -25,8 +28,7 @@ sub new {
$success->($self);
},
on_failure => sub {
- warn "Channel failed: @_";
- $failure->();
+ $failure->("Channel failed: @_");
}
);
};
@@ -39,8 +41,7 @@ sub new {
%args,
on_success => $channel,
on_failure => sub {
- warn "Connect failed: @_";
- $failure->();
+ $failure->("Connect failed: @_");
}
);
}
@@ -193,7 +194,7 @@ sub call {
name => undef,
args => undef,
on_sent => undef,
- on_failure => sub { warn "Failure: @_" },
+ on_failure => sub { warn "RPC Failure: @_" },
@_
);
commit a10d8002bcf741fe327946ea067b212c2d87ab97
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Sat Nov 3 19:56:08 2012 -0400
Add POD
diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 693d49d..664cb81 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -296,3 +296,188 @@ sub call {
}
1;
+
+__END__
+
+=head1 NAME
+
+AnyEvent::RabbitMQ::RPC - RPC queues via RabbitMQ
+
+=head1 SYNOPSIS
+
+ use AnyEvent::RabbitMQ::RPC;
+
+ my $rpc = AnyEvent::RabbitMQ::RPC->new(
+ host => 'localhost',
+ port => 5672,
+ user => 'guest',
+ pass => 'guest',
+ vhost => '/',
+ serialize => 'Storable',
+ );
+
+ print $rpc->call(
+ name => 'MethodName',
+ args => { some => "data" },
+ );
+
+=head1 DESCRIPTION
+
+C<AnyEvent::RabbitMQ::RPC> provides an AnyEvent-based reliable job queue
+atop the RabbitMQ event server. This can be used as a replacement for
+similar reliable job queue/RPC client-worker models, such as
+L<TheSchwartz>.
+
+RPC classes can L<register> calls that they can handle, and/or use
+L<call> to request another client perform work.
+
+=head1 METHODS
+
+=head2 new
+
+Create a new RPC object. Either an existing L<AnyEvent::RabbitMQ>
+object can be passed using the C<connection> argument, or the all of the
+provided parameters will be passed through to
+L<AnyEvent::RabbitMQ/connect> on a new object. In the latter case,
+common parameters include C<host>, C<port>, C<user>, C<pass>, and
+C<vhost>.
+
+If you wish to pass complex data structures back and forth to remote
+workers, a value must be passed for C<serialize>. Both worker and
+client must be configured to use the same serialization method. The
+available options are:
+
+=over
+
+=item YAML
+
+Use L<YAML::Any/Dump> and L<YAML::Any/Load> to serialize and deserialize
+data.
+
+=item JSON
+
+Use L<JSON::Any/objToJson> and L<JSON::Any/jsonToObj> to serialize and
+deserialize.
+
+=item Storable
+
+Use L<Storable/nfreeze> and L<Storable/thaw> to serialize and
+deserialize.
+
+=back
+
+Two callback points, C<on_success> and C<on_failure>, are provided.
+C<on_success> will be passed the initialized L<AnyEvent::RabbitMQ::RPC>
+object; C<on_failure> will be passed the reason for the failure. If no
+C<on_success> is provided, this call will block using an
+L<AnyEvent::CondVar> until the connection is established.
+
+
+=head2 register name => C<STRING>, run => C<SUBREF>
+
+Establishes that the current process knows how to run the job named
+C<STRING>, whose definition is provided by the surboutine C<SUBREF>.
+The subroutine will be called whenever a job is removed from the queue;
+it will be called with the argument passed to C</call>, which may be
+more than a string if C<serialize> was set during L</new>.
+
+Due to a limitation of C<AnyEvent::RabbitMQ>, false values returned by
+the subroutine are transformed into the true-valued string C<0E0>.
+Subroutines which fail to execute to completion (via C<die> or other
+runtime execution failure) will re-insert the job into the queue for the
+next worker to process.
+
+Returning non-string values requires that both worker and client have
+been created with the same (non-empty) value of C<serialize> passed to
+L</new>.
+
+A callback C<on_failure> may optionally be passed, which will be called
+with an error message if suitable channels cannoot be configured on the
+RabbitMQ server.
+
+=head2 call name => C<STRING>, args => C<VALUE>
+
+Submits a job to the job queue. The C<VALUE> provided must be a string,
+unless C<serialize> was passed to L</new>.
+
+Three callbacks exist:
+
+=over
+
+=item on_reply
+
+Called when the job has been completed successfully, and will be passed
+the return value of the job. Returning non-string values requires that
+both worker and client have been created with the same (non-empty) value
+of C<serialize> passed to L</new>.
+
+=item on_sent
+
+Called once the job has been submitted, with a true value if the job was
+submitted successfully. A false value will be passed if the requisite
+channels could not be configured, and the job was not submitted
+sucessfully.
+
+=item on_failure
+
+Called if there was an error submitting the job, and is passed the
+reason for the failure. If C<on_sent> was also provided, this is
+I<also> called after C<on_sent> is called with a false value.
+
+=back
+
+
+If no value for C<on_reply> is provided, and the C<call> function is not
+in void context, a C<AnyEvent::CondVar> is used to automatically block
+until a reply is received from a worker; the return value of the reply
+is then returned from L</call>.
+
+
+=head2 connection
+
+Returns the L<AnyEvent::RabbitMQ> connection used by this object.
+
+=head2 channel
+
+Returns the L<AnyEvent::RabbitMQ::Channel> used by this object.
+
+=head2 rpc_queue queue => C<NAME>, on_success => C<CALLBACK>
+
+Creates the queue with the given name, used to schedule jobs. These
+queues are durable, and thus persist across program invocations and
+RabbitMQ restarts.
+
+The C<on_success> callback is called once the queue is known to exist.
+The C<on_failure> may alternately be called with a reason if the queue
+creation fails.
+
+=head2 reply_queue on_success => C<CALLBACK>
+
+Creates a temporary queue used to reply to job requests. These queues
+are anonymous and ephemeral, and are torn down after each RPC call.
+
+The C<on_success> callback is called with the name of the queue that has
+been created. The C<on_failure> may alternately be called with a reason
+if the queue creation fails.
+
+
+=head1 AUTHOR
+
+Alex Vandiver C<< <alexmv at bestpractical.com> >>
+
+=head1 BUGS
+
+All bugs should be reported via
+L<http://rt.cpan.org/Public/Dist/Display.html?Name=AnyEvent-RabbitMQ-RPC>
+or L<bug-AnyEvent-RabbitMQ-RPC at rt.cpan.org>.
+
+
+=head1 LICENSE AND COPYRIGHT
+
+This software is Copyright (c) 2012 by Best Practical Solutions
+
+This is free software, licensed under:
+
+ The GNU General Public License, Version 2, June 1991
+
+=cut
commit 5aaa332ae613446ca18d6f9bf989526fbfe6c867
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Sun Nov 4 16:33:41 2012 -0500
Add version 0.5
diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 664cb81..c573139 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -6,6 +6,8 @@ use warnings;
use AnyEvent::RabbitMQ;
use Try::Tiny;
+our $VERSION = '0.5';
+
sub new {
my $class = shift;
my %args = @_;
-----------------------------------------------------------------------
More information about the Bps-public-commit
mailing list