[Bps-public-commit] r17170 - in IPC-PubSub: . lib/IPC/PubSub

alexmv at bestpractical.com alexmv at bestpractical.com
Thu Dec 11 00:44:07 EST 2008


Author: alexmv
Date: Thu Dec 11 00:44:06 2008
New Revision: 17170

Added:
   IPC-PubSub/t/publisher.t
Modified:
   IPC-PubSub/   (props changed)
   IPC-PubSub/lib/IPC/PubSub/Publisher.pm

Log:
 r39802 at kohr-ah:  chmrr | 2008-12-11 00:43:59 -0500
  * Publisher and index fixes


Modified: IPC-PubSub/lib/IPC/PubSub/Publisher.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Publisher.pm	(original)
+++ IPC-PubSub/lib/IPC/PubSub/Publisher.pm	Thu Dec 11 00:44:06 2008
@@ -3,7 +3,8 @@
 use warnings;
 use base qw/Class::Accessor::Fast/;
 
-__PACKAGE__->mk_accessors(qw/expiry _indice _uuid _cache/);
+__PACKAGE__->mk_accessors(qw/expiry/);
+__PACKAGE__->mk_ro_accessors(qw/_indice uuid _cache/);
 
 sub new {
     my $class   = shift;
@@ -16,7 +17,7 @@
         expiry  => 0,
         _cache  => $cache,
         _indice => { map { $_ => 1 } @_ },
-        _uuid   => $uuid,
+        uuid    => $uuid,
     });
     $cache->add_publisher($_, $uuid) for @_;
     return $self;
@@ -32,19 +33,19 @@
 sub publish {
     my $self = shift;
     $self->_indice->{$_} ||= do {
-        $self->_cache->add_publisher($_);
+        $self->_cache->add_publisher($_, $self->uuid);
         1;
     } for @_;
 }
 
 sub unpublish {
     my $self = shift;
-    delete($self->_indice->{$_}) and $self->_cache->remove_publisher($_) for @_;
+    delete($self->_indice->{$_}) and $self->_cache->remove_publisher($_, $self->uuid) for @_;
 }
 
 sub msg {
     my $self    = shift;
-    my $uuid    = $self->_uuid;
+    my $uuid    = $self->uuid;
     my $indice  = $self->_indice;
     my $expiry  = $self->expiry;
     foreach my $msg (@_) {
@@ -59,7 +60,7 @@
 sub DESTROY {
     my $self = shift;
     return unless $self->_cache;
-    $self->_cache->remove_publisher($_, $self->_cache) for $self->channels;
+    $self->_cache->remove_publisher($_, $self->uuid) for $self->channels;
 }
 
 1;

Added: IPC-PubSub/t/publisher.t
==============================================================================
--- (empty file)
+++ IPC-PubSub/t/publisher.t	Thu Dec 11 00:44:06 2008
@@ -0,0 +1,87 @@
+use strict;
+use warnings;
+use Test::More;
+use IPC::PubSub;
+use IO::Socket::INET;
+use File::Temp ':POSIX';
+
+my @backends = qw(PlainHash);
+
+unshift @backends, 'DBM_Deep' if eval { require DBM::Deep };
+unshift @backends, 'JiftyDBI' if eval { require Jifty::DBI };
+unshift @backends, 'Memcached' if eval { require Cache::Memcached } and IO::Socket::INET->new('127.0.0.1:11211');
+
+plan tests => 33 * scalar @backends;
+
+my $tmp = tmpnam();
+END { unlink $tmp }
+
+my %init_args = (
+    DBM_Deep    => [ $tmp ],
+    JiftyDBI    => [ db_init => 1 ],
+    Memcached   => [ rand() . $$ ],
+);
+
+SKIP: for my $backend (@backends) {
+    diag("Testing backend $backend");
+
+    my $bus = IPC::PubSub->new( $backend, @{ $init_args{$backend} } );
+    my $pub = $bus->new_publisher( "first", "second" );
+    my $cache = $bus->_cache;
+
+    is_deeply( scalar $pub->channels, { first => 1, second => 1 } );
+    is_deeply( [ sort $pub->channels ], [ "first", "second" ] );
+    is_deeply( $cache->publisher_indices("first"),  { $pub->uuid => 0 } );
+    is_deeply( $cache->publisher_indices("second"), { $pub->uuid => 0 } );
+    is_deeply( $cache->publisher_indices("third"),  {} );
+
+    $pub->publish("third");
+    is_deeply( scalar $pub->channels,
+        { first => 1, second => 1, third => 1 } );
+    is_deeply( [ sort $pub->channels ], [ "first", "second", "third" ] );
+    is_deeply( $cache->publisher_indices("first"),  { $pub->uuid => 0 } );
+    is_deeply( $cache->publisher_indices("second"), { $pub->uuid => 0 } );
+    is_deeply( $cache->publisher_indices("third"),  { $pub->uuid => 0 } );
+
+    $pub->publish("third");
+    is_deeply( scalar $pub->channels,
+        { first => 1, second => 1, third => 1 } );
+    is_deeply( [ sort $pub->channels ], [ "first", "second", "third" ] );
+    is_deeply( $cache->publisher_indices("third"),  { $pub->uuid => 0 } );
+
+    $pub->msg("message 1");
+    is_deeply( scalar $pub->channels,
+        { first => 2, second => 2, third => 2 } );
+
+    $pub->unpublish("second");
+    is_deeply( scalar $pub->channels, { first => 2, third => 2 } );
+
+    $pub->msg("message 2");
+    is_deeply( scalar $pub->channels, { first => 3, third => 3 } );
+
+    is_deeply( $cache->publisher_indices("first"),  { $pub->uuid => 2 } );
+    is_deeply( $cache->publisher_indices("second"), {} );
+    is_deeply( $cache->publisher_indices("third"),  { $pub->uuid => 2 } );
+
+    is($cache->get_index( first => $pub->uuid  ), 2 );
+    $cache->set_index( first => $pub->uuid, 5 );
+    is($cache->get_index( first => $pub->uuid ), 5 );    
+    is_deeply( $cache->publisher_indices("first"),  { $pub->uuid => 5 } );
+
+    {
+        my $pub2 = $bus->new_publisher( "first", "second", "third" );
+        is_deeply( scalar $pub2->channels, { first => 1, second => 1, third => 1 } );
+        is_deeply( $cache->publisher_indices("first"),  { $pub->uuid => 5, $pub2->uuid => 0 } );
+        is_deeply( $cache->publisher_indices("second"), {                  $pub2->uuid => 0 } );
+        is_deeply( $cache->publisher_indices("third"),  { $pub->uuid => 2, $pub2->uuid => 0 } );
+
+        $pub2->unpublish("first");
+        is_deeply( scalar $pub2->channels, { second => 1, third => 1 } );
+        is_deeply( $cache->publisher_indices("first"),  { $pub->uuid => 5                   } );
+        is_deeply( $cache->publisher_indices("second"), {                  $pub2->uuid => 0 } );
+        is_deeply( $cache->publisher_indices("third"),  { $pub->uuid => 2, $pub2->uuid => 0 } );
+    }
+    is_deeply( $cache->publisher_indices("first"),  { $pub->uuid => 5 } );
+    is_deeply( $cache->publisher_indices("second"), {} );
+    is_deeply( $cache->publisher_indices("third"),  { $pub->uuid => 2 } );
+}



More information about the Bps-public-commit mailing list