[Bps-public-commit] anyevent-rabbitmq-rpc branch, master, created. 811ef13cd1f69eb43eb551262964e5105f9e0f00
Alex Vandiver
alexmv at bestpractical.com
Sun Jul 8 23:07:37 EDT 2012
The branch, master has been created
at 811ef13cd1f69eb43eb551262964e5105f9e0f00 (commit)
- Log -----------------------------------------------------------------
commit e570a997406df06bb6100bbcdde9abddea8090b3
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Thu Mar 22 03:04:18 2012 -0400
Initial implementation
diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
new file mode 100644
index 0000000..7f6d8ef
--- /dev/null
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -0,0 +1,247 @@
+package AnyEvent::RabbitMQ::RPC;
+
+use strict;
+use warnings;
+
+use AnyEvent::RabbitMQ;
+
+sub new {
+ my $class = shift;
+ my %args = @_;
+
+ my $self = bless {}, $class;
+
+ my $cv = AE::cv;
+
+ my $amqp = $args{connection};
+ my $channel = sub {
+ $amqp->open_channel(
+ on_success => sub {
+ $self->{channel} = shift;
+ $self->{channel}->qos;
+ $cv->send($self);
+ },
+ on_failure => sub {
+ warn "Channel failed: @_";
+ $cv->send();
+ }
+ );
+ };
+ if ($amqp) {
+ $channel->();
+ } else {
+ AnyEvent::RabbitMQ->load_xml_spec;
+ $amqp = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
+ $amqp->connect(
+ %args,
+ on_success => $channel,
+ on_failure => sub {
+ warn "Connect failed: @_";
+ $cv->send();
+ }
+ );
+ }
+
+ $args{serialize} ||= '';
+ if ($args{serialize} eq "YAML") {
+ require YAML::Any;
+ $self->{serialize} = \&YAML::Any::Dump;
+ $self->{unserialize} = \&YAML::Any::Load;
+ } elsif ($args{serialize} eq "JSON") {
+ require JSON::Any;
+ JSON::Any->import;
+ my $json = JSON::Any->new;
+ $self->{serialize} = sub { $json->objToJson( [@_] ) };
+ $self->{unserialize} = sub { (@{ $json->jsonToObj(@_) })[0] };
+ } elsif ($args{serialize} eq "Storable") {
+ require Storable;
+ $self->{serialize} = sub { Storable::nfreeze( [@_] )};
+ $self->{unserialize} = sub { (@{ Storable::thaw(@_) })[0] };
+ }
+
+ # Block on having set up the channel
+ return $cv->recv;
+}
+
+sub channel {
+ my $self = shift;
+ return $self->{channel};
+}
+
+sub rpc_queue {
+ my $self = shift;
+ my %args = @_;
+
+ # These queues are durable -- as such, we should only need to check
+ # that they are there once per process.
+ return $args{on_success}->()
+ if $self->{queues}{$args{queue}};
+
+ $self->channel->declare_queue(
+ no_ack => 0,
+ durable => 1,
+ exclusive => 0,
+ %args,
+ on_success => sub {
+ $self->{queues}{$args{queue}}++;
+ $args{on_success}->();
+ },
+ );
+}
+
+sub reply_queue {
+ my $self = shift;
+ my %args = @_;
+
+ $self->channel->declare_queue(
+ no_ack => 1,
+ durable => 0,
+ exclusive => 1,
+ on_success => sub {
+ $args{on_success}->(shift->method_frame->queue);
+ },
+ on_failure => $args{on_failure},
+ );
+}
+
+sub register {
+ my $self = shift;
+ my %args = (
+ name => undef,
+ run => sub {},
+ on_failure => sub { warn "Failure: @_" },
+ @_
+ );
+
+ # Ensure we have the queue
+ $self->rpc_queue(
+ queue => $args{name},
+ on_success => sub {
+ # And set up a listen on it
+ $self->channel->consume(
+ queue => $args{name},
+ no_ack => 0,
+ on_consume => sub {
+ my $frame = shift;
+ my $args = $frame->{body}->payload;
+ $args = $self->{unserialize}->($args)
+ if $self->{unserialize};
+
+ # Call the sub
+ my $return = $args{run}->( $args );
+
+ # Send the response, if they asked for it
+ if (my $reply_to = $frame->{header}->reply_to) {
+ $return = $self->{serialize}->($return)
+ if $self->{serialize};
+ $return = "0E0" if not $return;
+ $self->channel->publish(
+ exchange => '',
+ routing_key => $reply_to,
+ body => $return,
+ );
+ }
+
+ # And finally mark the task as complete
+ $self->channel->ack;
+ },
+ on_failure => $args{on_failure},
+ );
+ },
+ on_failure => $args{on_failure},
+ );
+}
+
+sub call {
+ my $self = shift;
+
+ my %args = (
+ name => undef,
+ args => undef,
+ on_sent => undef,
+ on_failure => sub { warn "Failure: @_" },
+ @_
+ );
+
+ my $finished;
+ if (defined wantarray and not $args{on_reply}) {
+ # We we're called in a not-void context, and without a reply
+ # callback, assume this is a syncronous call, and set up
+ # $finished to block on the reply
+ $args{on_reply} = $finished = AE::cv;
+ my $fail = $args{on_failure};
+ $args{on_failure} = sub {
+ $fail->(@_) if $fail;
+ $finished->send(undef);
+ }
+ }
+
+ my $sent_failure = $args{on_sent} ? sub {
+ $args{on_sent}->send(0);
+ $args{on_failure}->(@_);
+ } : $args{on_failure};
+
+ my $send; $send = sub {
+ my $REPLIES = shift;
+ my $args = $args{args};
+ $args = $self->{serialize}->($args)
+ if $self->{serialize};
+ $args = "0E0" if not $args;
+ $self->channel->publish(
+ exchange => '',
+ routing_key => $args{name},
+ body => $args,
+ header => {
+ ($REPLIES ? (reply_to => $REPLIES) : ()),
+ delivery_mode => 2, # Persistent storage
+ },
+ );
+ $args{on_sent}->send(1) if $args{on_sent};
+ };
+
+ unless ($args{on_reply}) {
+ # Fire and forget
+ $self->rpc_queue(
+ queue => $args{name},
+ on_success => sub { $send->(undef) },
+ on_failure => $sent_failure,
+ );
+ return;
+ }
+
+ # We need to set up an ephemeral reply queue
+ $self->rpc_queue(
+ queue => $args{name},
+ on_success => sub {
+ $self->reply_queue(
+ on_success => sub {
+ my $REPLIES = shift;
+ $self->channel->consume(
+ queue => $REPLIES,
+ no_ack => 1,
+ on_consume => sub {
+ my $frame = shift;
+ # We got a reply, tear down our reply queue
+ $self->channel->delete_queue(
+ queue => $REPLIES,
+ );
+ my $return = $frame->{body}->payload;
+ $return = $self->{unserialize}->($return)
+ if $self->{unserialize};
+ $args{on_reply}->($return);
+ },
+ on_success => sub { $send->($REPLIES) },
+ on_failure => $sent_failure,
+ );
+ },
+ on_failure => $sent_failure,
+ );
+ },
+ on_failure => $sent_failure,
+ );
+
+ return $finished->recv if $finished;
+ return 1;
+}
+
+1;
commit 7c3fa70e70bff4953963a8fbec11d1248ea9f4b7
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Mon Mar 26 01:11:41 2012 -0400
Use Try::Tiny to protect the (un)serialization and RPC call
diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 7f6d8ef..7ac2e31 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -4,6 +4,7 @@ use strict;
use warnings;
use AnyEvent::RabbitMQ;
+use Try::Tiny;
sub new {
my $class = shift;
@@ -123,17 +124,40 @@ sub register {
no_ack => 0,
on_consume => sub {
my $frame = shift;
+ my $failed;
my $args = $frame->{body}->payload;
- $args = $self->{unserialize}->($args)
- if $self->{unserialize};
+ if ($self->{unserialize}) {
+ try {
+ $args = $self->{unserialize}->($args);
+ } catch {
+ $failed = 1;
+ $args{on_failure}->("Unserialization failed: $_");
+ };
+ return if $failed;
+ }
# Call the sub
- my $return = $args{run}->( $args );
+ my $return;
+ try {
+ $return = $args{run}->( $args );
+ } catch {
+ $failed = 1;
+ $args{on_failure}->("Call died: $_");
+ };
+ return if $failed;
# Send the response, if they asked for it
if (my $reply_to = $frame->{header}->reply_to) {
- $return = $self->{serialize}->($return)
- if $self->{serialize};
+ if ($self->{serialize}) {
+ try {
+ $return = $self->{serialize}->($return);
+ } catch {
+ $failed = 1;
+ $args{on_failure}->("Serialization failed: $_");
+ };
+ return if $failed;
+ }
+
$return = "0E0" if not $return;
$self->channel->publish(
exchange => '',
@@ -184,8 +208,16 @@ sub call {
my $send; $send = sub {
my $REPLIES = shift;
my $args = $args{args};
- $args = $self->{serialize}->($args)
- if $self->{serialize};
+ if ($self->{serialize}) {
+ my $failed;
+ try {
+ $args = $self->{serialize}->($args);
+ } catch {
+ $failed = 1;
+ $args{on_failure}->("Serialization failed: $_");
+ };
+ return if $failed;
+ }
$args = "0E0" if not $args;
$self->channel->publish(
exchange => '',
@@ -226,8 +258,16 @@ sub call {
queue => $REPLIES,
);
my $return = $frame->{body}->payload;
- $return = $self->{unserialize}->($return)
- if $self->{unserialize};
+ if ($self->{unserialize}) {
+ my $failed;
+ try {
+ $return = $self->{unserialize}->($return);
+ } catch {
+ $args{on_failure}->("Unserialization failed: $_");
+ $failed = 1;
+ };
+ return if $failed;
+ }
$args{on_reply}->($return);
},
on_success => sub { $send->($REPLIES) },
commit 07f1536aa81fb2d63e6715fc60202e7191138303
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Mon Mar 26 01:12:45 2012 -0400
Make the connection available to other sources via a method
diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 7ac2e31..60128a5 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -14,9 +14,9 @@ sub new {
my $cv = AE::cv;
- my $amqp = $args{connection};
+ $self->{connection} = $args{connection};
my $channel = sub {
- $amqp->open_channel(
+ $self->connection->open_channel(
on_success => sub {
$self->{channel} = shift;
$self->{channel}->qos;
@@ -28,12 +28,12 @@ sub new {
}
);
};
- if ($amqp) {
+ if ($self->connection) {
$channel->();
} else {
AnyEvent::RabbitMQ->load_xml_spec;
- $amqp = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
- $amqp->connect(
+ $self->{connection} = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
+ $self->connection->connect(
%args,
on_success => $channel,
on_failure => sub {
@@ -64,6 +64,11 @@ sub new {
return $cv->recv;
}
+sub connection {
+ my $self = shift;
+ return $self->{connection};
+}
+
sub channel {
my $self = shift;
return $self->{channel};
commit fb8f0f3fd55603d36fdaec12eab6a5aca412c04c
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Mon Mar 26 01:13:17 2012 -0400
Allow a more callback-oriented approach to ->new
diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index 60128a5..d007244 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -13,6 +13,8 @@ sub new {
my $self = bless {}, $class;
my $cv = AE::cv;
+ my $success = $args{on_success} || $cv;
+ my $failure = $args{on_failure} || $cv;
$self->{connection} = $args{connection};
my $channel = sub {
@@ -20,11 +22,11 @@ sub new {
on_success => sub {
$self->{channel} = shift;
$self->{channel}->qos;
- $cv->send($self);
+ $success->($self);
},
on_failure => sub {
warn "Channel failed: @_";
- $cv->send();
+ $failure->();
}
);
};
@@ -38,7 +40,7 @@ sub new {
on_success => $channel,
on_failure => sub {
warn "Connect failed: @_";
- $cv->send();
+ $failure->();
}
);
}
@@ -60,7 +62,10 @@ sub new {
$self->{unserialize} = sub { (@{ Storable::thaw(@_) })[0] };
}
- # Block on having set up the channel
+ # If they have a callback waiting for them, bail now
+ return if $args{on_success};
+
+ # Otherwise, block on having set up the channel
return $cv->recv;
}
commit 811ef13cd1f69eb43eb551262964e5105f9e0f00
Author: Alex Vandiver <alexmv at bestpractical.com>
Date: Mon Mar 26 01:14:21 2012 -0400
Use $callback->() instead of $callback->send() so they work with subs, not just condvars
diff --git a/lib/AnyEvent/RabbitMQ/RPC.pm b/lib/AnyEvent/RabbitMQ/RPC.pm
index d007244..1991d7e 100644
--- a/lib/AnyEvent/RabbitMQ/RPC.pm
+++ b/lib/AnyEvent/RabbitMQ/RPC.pm
@@ -211,7 +211,7 @@ sub call {
}
my $sent_failure = $args{on_sent} ? sub {
- $args{on_sent}->send(0);
+ $args{on_sent}->(0);
$args{on_failure}->(@_);
} : $args{on_failure};
@@ -238,7 +238,7 @@ sub call {
delivery_mode => 2, # Persistent storage
},
);
- $args{on_sent}->send(1) if $args{on_sent};
+ $args{on_sent}->(1) if $args{on_sent};
};
unless ($args{on_reply}) {
-----------------------------------------------------------------------
More information about the Bps-public-commit
mailing list