[Bps-public-commit] r17170 - in IPC-PubSub: . lib/IPC/PubSub
alexmv at bestpractical.com
alexmv at bestpractical.com
Thu Dec 11 00:44:07 EST 2008
Author: alexmv
Date: Thu Dec 11 00:44:06 2008
New Revision: 17170
Added:
IPC-PubSub/t/publisher.t
Modified:
IPC-PubSub/ (props changed)
IPC-PubSub/lib/IPC/PubSub/Publisher.pm
Log:
r39802 at kohr-ah: chmrr | 2008-12-11 00:43:59 -0500
* Publisher and index fixes
Modified: IPC-PubSub/lib/IPC/PubSub/Publisher.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Publisher.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub/Publisher.pm Thu Dec 11 00:44:06 2008
@@ -3,7 +3,8 @@
use warnings;
use base qw/Class::Accessor::Fast/;
-__PACKAGE__->mk_accessors(qw/expiry _indice _uuid _cache/);
+__PACKAGE__->mk_accessors(qw/expiry/);
+__PACKAGE__->mk_ro_accessors(qw/_indice uuid _cache/);
sub new {
my $class = shift;
@@ -16,7 +17,7 @@
expiry => 0,
_cache => $cache,
_indice => { map { $_ => 1 } @_ },
- _uuid => $uuid,
+ uuid => $uuid,
});
$cache->add_publisher($_, $uuid) for @_;
return $self;
@@ -32,19 +33,19 @@
sub publish {
my $self = shift;
$self->_indice->{$_} ||= do {
- $self->_cache->add_publisher($_);
+ $self->_cache->add_publisher($_, $self->uuid);
1;
} for @_;
}
sub unpublish {
my $self = shift;
- delete($self->_indice->{$_}) and $self->_cache->remove_publisher($_) for @_;
+ delete($self->_indice->{$_}) and $self->_cache->remove_publisher($_, $self->uuid) for @_;
}
sub msg {
my $self = shift;
- my $uuid = $self->_uuid;
+ my $uuid = $self->uuid;
my $indice = $self->_indice;
my $expiry = $self->expiry;
foreach my $msg (@_) {
@@ -59,7 +60,7 @@
sub DESTROY {
my $self = shift;
return unless $self->_cache;
- $self->_cache->remove_publisher($_, $self->_cache) for $self->channels;
+ $self->_cache->remove_publisher($_, $self->uuid) for $self->channels;
}
1;
Added: IPC-PubSub/t/publisher.t
==============================================================================
--- (empty file)
+++ IPC-PubSub/t/publisher.t Thu Dec 11 00:44:06 2008
@@ -0,0 +1,87 @@
+use strict;
+use warnings;
+use Test::More;
+use IPC::PubSub;
+use IO::Socket::INET;
+use File::Temp ':POSIX';
+
+my @backends = qw(PlainHash);
+
+unshift @backends, 'DBM_Deep' if eval { require DBM::Deep };
+unshift @backends, 'JiftyDBI' if eval { require Jifty::DBI };
+unshift @backends, 'Memcached' if eval { require Cache::Memcached } and IO::Socket::INET->new('127.0.0.1:11211');
+
+plan tests => 33 * scalar @backends;
+
+my $tmp = tmpnam();
+END { unlink $tmp }
+
+my %init_args = (
+ DBM_Deep => [ $tmp ],
+ JiftyDBI => [ db_init => 1 ],
+ Memcached => [ rand() . $$ ],
+);
+
+SKIP: for my $backend (@backends) {
+ diag("Testing backend $backend");
+
+ my $bus = IPC::PubSub->new( $backend, @{ $init_args{$backend} } );
+ my $pub = $bus->new_publisher( "first", "second" );
+ my $cache = $bus->_cache;
+
+ is_deeply( scalar $pub->channels, { first => 1, second => 1 } );
+ is_deeply( [ sort $pub->channels ], [ "first", "second" ] );
+ is_deeply( $cache->publisher_indices("first"), { $pub->uuid => 0 } );
+ is_deeply( $cache->publisher_indices("second"), { $pub->uuid => 0 } );
+ is_deeply( $cache->publisher_indices("third"), {} );
+
+ $pub->publish("third");
+ is_deeply( scalar $pub->channels,
+ { first => 1, second => 1, third => 1 } );
+ is_deeply( [ sort $pub->channels ], [ "first", "second", "third" ] );
+ is_deeply( $cache->publisher_indices("first"), { $pub->uuid => 0 } );
+ is_deeply( $cache->publisher_indices("second"), { $pub->uuid => 0 } );
+ is_deeply( $cache->publisher_indices("third"), { $pub->uuid => 0 } );
+
+ $pub->publish("third");
+ is_deeply( scalar $pub->channels,
+ { first => 1, second => 1, third => 1 } );
+ is_deeply( [ sort $pub->channels ], [ "first", "second", "third" ] );
+ is_deeply( $cache->publisher_indices("third"), { $pub->uuid => 0 } );
+
+ $pub->msg("message 1");
+ is_deeply( scalar $pub->channels,
+ { first => 2, second => 2, third => 2 } );
+
+ $pub->unpublish("second");
+ is_deeply( scalar $pub->channels, { first => 2, third => 2 } );
+
+ $pub->msg("message 2");
+ is_deeply( scalar $pub->channels, { first => 3, third => 3 } );
+
+ is_deeply( $cache->publisher_indices("first"), { $pub->uuid => 2 } );
+ is_deeply( $cache->publisher_indices("second"), {} );
+ is_deeply( $cache->publisher_indices("third"), { $pub->uuid => 2 } );
+
+ is($cache->get_index( first => $pub->uuid ), 2 );
+ $cache->set_index( first => $pub->uuid, 5 );
+ is($cache->get_index( first => $pub->uuid ), 5 );
+ is_deeply( $cache->publisher_indices("first"), { $pub->uuid => 5 } );
+
+ {
+ my $pub2 = $bus->new_publisher( "first", "second", "third" );
+ is_deeply( scalar $pub2->channels, { first => 1, second => 1, third => 1 } );
+ is_deeply( $cache->publisher_indices("first"), { $pub->uuid => 5, $pub2->uuid => 0 } );
+ is_deeply( $cache->publisher_indices("second"), { $pub2->uuid => 0 } );
+ is_deeply( $cache->publisher_indices("third"), { $pub->uuid => 2, $pub2->uuid => 0 } );
+
+ $pub2->unpublish("first");
+ is_deeply( scalar $pub2->channels, { second => 1, third => 1 } );
+ is_deeply( $cache->publisher_indices("first"), { $pub->uuid => 5 } );
+ is_deeply( $cache->publisher_indices("second"), { $pub2->uuid => 0 } );
+ is_deeply( $cache->publisher_indices("third"), { $pub->uuid => 2, $pub2->uuid => 0 } );
+ }
+ is_deeply( $cache->publisher_indices("first"), { $pub->uuid => 5 } );
+ is_deeply( $cache->publisher_indices("second"), {} );
+ is_deeply( $cache->publisher_indices("third"), { $pub->uuid => 2 } );
+}
More information about the Bps-public-commit
mailing list