[Rt-commit] r6301 - in IPC-PubSub: lib/IPC lib/IPC/PubSub lib/IPC/PubSub/Cache lib/IPC/PubSub/Cache/JiftyDBI lib/IPC/PubSub/Cache/JiftyDBI/Stash

audreyt at bestpractical.com audreyt at bestpractical.com
Wed Oct 25 21:23:21 EDT 2006


Author: audreyt
Date: Wed Oct 25 21:23:20 2006
New Revision: 6301

Added:
   IPC-PubSub/lib/IPC/PubSub/Cache.pm
Removed:
   IPC-PubSub/IPC-PubSub-0.10.tar.gz
Modified:
   IPC-PubSub/Changes
   IPC-PubSub/MANIFEST
   IPC-PubSub/META.yml
   IPC-PubSub/Makefile
   IPC-PubSub/Makefile.PL
   IPC-PubSub/lib/IPC/PubSub.pm
   IPC-PubSub/lib/IPC/PubSub/Cache/DBM_Deep.pm
   IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI.pm
   IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash.pm
   IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Item.pm
   IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Publisher.pm
   IPC-PubSub/lib/IPC/PubSub/Cache/Memcached.pm
   IPC-PubSub/lib/IPC/PubSub/Cache/PlainHash.pm
   IPC-PubSub/lib/IPC/PubSub/Cacheable.pm

Log:
* This be 0.20.

Modified: IPC-PubSub/Changes
==============================================================================
--- IPC-PubSub/Changes	(original)
+++ IPC-PubSub/Changes	Wed Oct 25 21:23:20 2006
@@ -1,3 +1,9 @@
+[Changes for 0.20 - 2006-10-25]
+
+* Memcached: Remove the debug messages accidentally left in ->lock and ->unlock.
+* New ->modify API for IPC::PubSub and Cache to atomically manipulate cache.
+* Time::HiRes is now required to reduce locking contention.
+
 [Changes for 0.11 - 2006-10-25]
 
 * The Memcached backend now takes a namespace parameter to avoid collision.

Modified: IPC-PubSub/MANIFEST
==============================================================================
--- IPC-PubSub/MANIFEST	(original)
+++ IPC-PubSub/MANIFEST	Wed Oct 25 21:23:20 2006
@@ -8,6 +8,7 @@
 inc/Module/Install/Win32.pm
 inc/Module/Install/WriteAll.pm
 lib/IPC/PubSub.pm
+lib/IPC/PubSub/Cache.pm
 lib/IPC/PubSub/Cache/DBM_Deep.pm
 lib/IPC/PubSub/Cache/JiftyDBI.pm
 lib/IPC/PubSub/Cache/JiftyDBI/Stash.pm
@@ -20,5 +21,6 @@
 lib/IPC/PubSub/Subscriber.pm
 Makefile.PL
 MANIFEST			This list of files
+META.yml
 README
 t/basic.t

Modified: IPC-PubSub/META.yml
==============================================================================
--- IPC-PubSub/META.yml	(original)
+++ IPC-PubSub/META.yml	Wed Oct 25 21:23:20 2006
@@ -12,5 +12,6 @@
   Class::Accessor::Fast: 0
   DBM::Deep: 0
   Data::UUID: 0
-  perl: 5.005
-version: 0.11
+  Time::HiRes: 0
+  perl: 5.6.0
+version: 0.20

Modified: IPC-PubSub/Makefile
==============================================================================
--- IPC-PubSub/Makefile	(original)
+++ IPC-PubSub/Makefile	Wed Oct 25 21:23:20 2006
@@ -17,9 +17,9 @@
 #     NAME => q[IPC::PubSub]
 #     NO_META => q[1]
 #     PL_FILES => {  }
-#     PREREQ_PM => { DBM::Deep=>q[0], Class::Accessor::Fast=>q[0], Data::UUID=>q[0] }
+#     PREREQ_PM => { DBM::Deep=>q[0], Time::HiRes=>q[0], Class::Accessor::Fast=>q[0], Data::UUID=>q[0] }
 #     SIGN => q[1]
-#     VERSION => q[0.11]
+#     VERSION => q[0.20]
 #     dist => { PREOP=>q[$(PERL) -I. -MModule::Install::Admin -e "dist_preop(q($(DISTVNAME)))"] }
 
 # --- MakeMaker post_initialize section:
@@ -60,11 +60,11 @@
 DFSEP = $(DIRFILESEP)
 NAME = IPC::PubSub
 NAME_SYM = IPC_PubSub
-VERSION = 0.11
+VERSION = 0.20
 VERSION_MACRO = VERSION
-VERSION_SYM = 0_11
+VERSION_SYM = 0_20
 DEFINE_VERSION = -D$(VERSION_MACRO)=\"$(VERSION)\"
-XS_VERSION = 0.11
+XS_VERSION = 0.20
 XS_VERSION_MACRO = XS_VERSION
 XS_DEFINE_VERSION = -D$(XS_VERSION_MACRO)=\"$(XS_VERSION)\"
 INST_ARCHLIB = blib/arch
@@ -181,6 +181,7 @@
 
 
 TO_INST_PM = lib/IPC/PubSub.pm \
+	lib/IPC/PubSub/Cache.pm \
 	lib/IPC/PubSub/Cache/DBM_Deep.pm \
 	lib/IPC/PubSub/Cache/JiftyDBI.pm \
 	lib/IPC/PubSub/Cache/JiftyDBI/Stash.pm \
@@ -200,6 +201,8 @@
 	blib/lib/IPC/PubSub.pm \
 	lib/IPC/PubSub/Cache/Memcached.pm \
 	blib/lib/IPC/PubSub/Cache/Memcached.pm \
+	lib/IPC/PubSub/Cache.pm \
+	blib/lib/IPC/PubSub/Cache.pm \
 	lib/IPC/PubSub/Cache/PlainHash.pm \
 	blib/lib/IPC/PubSub/Cache/PlainHash.pm \
 	lib/IPC/PubSub/Subscriber.pm \
@@ -280,7 +283,7 @@
 DIST_CP = best
 DIST_DEFAULT = tardist
 DISTNAME = IPC-PubSub
-DISTVNAME = IPC-PubSub-0.11
+DISTVNAME = IPC-PubSub-0.20
 
 
 # --- MakeMaker macro section:
@@ -785,7 +788,7 @@
 # --- MakeMaker ppd section:
 # Creates a PPD (Perl Package Description) for a binary distribution.
 ppd:
-	$(NOECHO) $(ECHO) '<SOFTPKG NAME="$(DISTNAME)" VERSION="0,11,0,0">' > $(DISTNAME).ppd
+	$(NOECHO) $(ECHO) '<SOFTPKG NAME="$(DISTNAME)" VERSION="0,20,0,0">' > $(DISTNAME).ppd
 	$(NOECHO) $(ECHO) '    <TITLE>$(DISTNAME)</TITLE>' >> $(DISTNAME).ppd
 	$(NOECHO) $(ECHO) '    <ABSTRACT>Interprocess Publish/Subscribe channels</ABSTRACT>' >> $(DISTNAME).ppd
 	$(NOECHO) $(ECHO) '    <AUTHOR>Audrey Tang &lt;cpan at audreyt.org&gt;</AUTHOR>' >> $(DISTNAME).ppd
@@ -793,6 +796,7 @@
 	$(NOECHO) $(ECHO) '        <DEPENDENCY NAME="Class-Accessor-Fast" VERSION="0,0,0,0" />' >> $(DISTNAME).ppd
 	$(NOECHO) $(ECHO) '        <DEPENDENCY NAME="DBM-Deep" VERSION="0,0,0,0" />' >> $(DISTNAME).ppd
 	$(NOECHO) $(ECHO) '        <DEPENDENCY NAME="Data-UUID" VERSION="0,0,0,0" />' >> $(DISTNAME).ppd
+	$(NOECHO) $(ECHO) '        <DEPENDENCY NAME="Time-HiRes" VERSION="0,0,0,0" />' >> $(DISTNAME).ppd
 	$(NOECHO) $(ECHO) '        <OS NAME="$(OSNAME)" />' >> $(DISTNAME).ppd
 	$(NOECHO) $(ECHO) '        <ARCHITECTURE NAME="darwin-2level" />' >> $(DISTNAME).ppd
 	$(NOECHO) $(ECHO) '        <CODEBASE HREF="" />' >> $(DISTNAME).ppd
@@ -808,6 +812,7 @@
 	  lib/IPC/PubSub/Cache/DBM_Deep.pm blib/lib/IPC/PubSub/Cache/DBM_Deep.pm \
 	  lib/IPC/PubSub.pm blib/lib/IPC/PubSub.pm \
 	  lib/IPC/PubSub/Cache/Memcached.pm blib/lib/IPC/PubSub/Cache/Memcached.pm \
+	  lib/IPC/PubSub/Cache.pm blib/lib/IPC/PubSub/Cache.pm \
 	  lib/IPC/PubSub/Cache/PlainHash.pm blib/lib/IPC/PubSub/Cache/PlainHash.pm \
 	  lib/IPC/PubSub/Subscriber.pm blib/lib/IPC/PubSub/Subscriber.pm \
 	  lib/IPC/PubSub/Cache/JiftyDBI/Stash/Publisher.pm blib/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Publisher.pm \

Modified: IPC-PubSub/Makefile.PL
==============================================================================
--- IPC-PubSub/Makefile.PL	(original)
+++ IPC-PubSub/Makefile.PL	Wed Oct 25 21:23:20 2006
@@ -5,6 +5,7 @@
 license     'MIT';
 all_from    'lib/IPC/PubSub.pm';
 
+requires    'Time::HiRes';
 requires    'DBM::Deep';
 requires    'Data::UUID';
 requires    'Class::Accessor::Fast';

Modified: IPC-PubSub/lib/IPC/PubSub.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub.pm	(original)
+++ IPC-PubSub/lib/IPC/PubSub.pm	Wed Oct 25 21:23:20 2006
@@ -1,8 +1,9 @@
 package IPC::PubSub;
-$IPC::PubSub::VERSION = '0.11';
+$IPC::PubSub::VERSION = '0.20';
 
-use 5.005;
+use 5.006;
 use strict;
+use IPC::PubSub::Cacheable;
 use IPC::PubSub::Publisher;
 use IPC::PubSub::Subscriber;
 use base qw/Class::Accessor::Fast/;
@@ -18,7 +19,7 @@
     eval { require "IPC/PubSub/Cache/$backend.pm" }
         or die "Cannot load backend module: IPC::PubSub::Cache::$backend: $@";
 
-    $self->_cache("IPC::PubSub::Cache::$backend"->new(@_));
+    $self->_cache(IPC::PubSub::Cacheable->new($backend => \@_));
     return $self;
 }
 
@@ -36,6 +37,7 @@
 sub store   { (+shift)->_cache->store(@_)   }
 sub lock    { (+shift)->_cache->lock(@_)    }
 sub unlock  { (+shift)->_cache->unlock(@_)  }
+sub modify  { (+shift)->_cache->modify(@_)  }
 
 1;
 

Added: IPC-PubSub/lib/IPC/PubSub/Cache.pm
==============================================================================
--- (empty file)
+++ IPC-PubSub/lib/IPC/PubSub/Cache.pm	Wed Oct 25 21:23:20 2006
@@ -0,0 +1,79 @@
+package IPC::PubSub::Cache;
+
+use strict;
+use File::Spec;
+use Time::HiRes ();
+#method fetch                (Str *@keys --> List of Pair)                   { ... }
+#method store                (Str $key, Str $val, Num $time, Num $expiry)    { ... }
+
+#method add_publisher        (Str $chan, Str $pub)                           { ... }
+#method remove_publisher     (Str $chan, Str $pub)                           { ... }
+
+#method get_index            (Str $chan, Str $pub --> Int)                   { ... }
+#method set_index            (Str $chan, Str $pub, Int $index)               { ... }
+
+#method publisher_indices    (Str $chan --> Hash of Int)                     { ... }
+
+sub modify {
+    my $self = shift;
+    my $key  = shift;
+    return (($self->fetch("data-$key"))[0] || [])->[-1] unless @_;
+
+    if (ref($_[0]) eq 'CODE') {
+        $self->lock("lock-$key");
+        local $_ = (($self->fetch("data-$key"))[0] || [])->[-1];
+        my $rv = $_[0]->();
+        $self->store("data-$key" => $_);
+        $self->unlock("lock-$key");
+        return $rv;
+    }
+    else {
+        $self->store("data-$key" => $_[0]);
+        return $_[0]
+    }
+}
+
+sub get {
+    my ($self, $chan, $orig, $curr) = @_;
+
+    no warnings 'uninitialized';
+    sort { $a->[0] <=> $b->[0] } $self->fetch(
+        map {
+            my $pub = $_;
+            my $index = $curr->{$pub};
+            map {
+                "$chan-$pub-$_"
+            } (($orig->{$pub}+1) .. $index);
+        } keys(%$curr)
+    );
+}
+
+sub put {
+    my ($self, $chan, $pub, $index, $msg, $expiry) = @_;
+    $self->store("$chan-$pub-$index", $msg, Time::HiRes::time(), $expiry);
+    $self->set_index($chan, $pub, $index);
+}
+
+
+use constant LOCK => File::Spec->catdir(File::Spec->tmpdir, 'IPC::PubSub-lock-');
+
+my %locks;
+sub lock {
+    my ($self, $chan) = @_;
+    for my $i (1..10) {
+        return if mkdir((LOCK . unpack("H*", $chan)), 0777);
+        Time::HiRes::usleep(rand(250000)+250000);
+    }
+}
+
+END {
+    rmdir(LOCK . unpack("H*", $_)) for keys %locks;
+}
+
+sub unlock {
+    my ($self, $chan) = @_;
+    rmdir(LOCK . unpack("H*", $chan));
+    delete $locks{$chan};
+}
+
+1;

Modified: IPC-PubSub/lib/IPC/PubSub/Cache/DBM_Deep.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/DBM_Deep.pm	(original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/DBM_Deep.pm	Wed Oct 25 21:23:20 2006
@@ -1,6 +1,6 @@
 package IPC::PubSub::Cache::DBM_Deep;
 use strict;
-use base 'IPC::PubSub::Cacheable';
+use base 'IPC::PubSub::Cache';
 use DBM::Deep;
 use File::Temp qw/ tempfile /;
 

Modified: IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI.pm	(original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI.pm	Wed Oct 25 21:23:20 2006
@@ -1,6 +1,6 @@
 package IPC::PubSub::Cache::JiftyDBI;
 use strict;
-use base 'IPC::PubSub::Cacheable';
+use base 'IPC::PubSub::Cache';
 use IPC::PubSub::Cache::JiftyDBI::Stash;
 my %cache;
 
@@ -37,7 +37,15 @@
     my ($self, $key, $val, $time, $expiry) = @_;
     $expiry ||= 0;
     my $item = IPC::PubSub::Cache::JiftyDBI::Stash::Item->new(handle => $STASH->handle);
-    $item->create( key => $key, expiry => ($time+$expiry), val => $val);
+
+    $item->load_by_cols( key => $key );
+    if ( $item->id ) {
+        $item->set_val($val);
+        $item->set_expiry($time+$expiry);
+    }
+    else {
+        $item->create( key => $key, expiry => ($time+$expiry), val => $val );
+    }
 }
 
 sub publisher_indices {

Modified: IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash.pm	(original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash.pm	Wed Oct 25 21:23:20 2006
@@ -18,8 +18,21 @@
     my %args = (
         db_init => 0,
         db_config => undef,
+        db_handle => undef,
+        table_prefix => 'pubsub_',
         @_
     );
+
+    if ($args{'table_prefix'}) {
+
+         IPC::PubSub::Cache::JiftyDBI::Stash::Item->table_prefix($args{'table_prefix'});
+         IPC::PubSub::Cache::JiftyDBI::Stash::Publisher->table_prefix($args{'table_prefix'});
+    }
+
+    if ($args{'db_handle'}) {
+            $self->handle($args{'db_handle'});
+
+    } else {
     unless ( $args{'db_config'} ) {
         my $filename;
         ( undef, $filename ) = tempfile();
@@ -28,10 +41,12 @@
     }
 
     $self->_connect( %{$args{'db_config'}} );
+
+    }
     if ( $args{'db_init'} ) {
         $self->_generate_db();
     }
-    $self;
+    return $self;
 }
 
 sub handle {

Modified: IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Item.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Item.pm	(original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Item.pm	Wed Oct 25 21:23:20 2006
@@ -3,6 +3,8 @@
 use warnings;
 use strict;
 
+use vars qw/$TABLE_PREFIX/;
+
 use Jifty::DBI::Schema;
 use Jifty::DBI::Record schema {
     column key    => type is 'text';
@@ -10,6 +12,19 @@
     column expiry => type is 'int';
 };
 
+
+sub table_prefix {
+    my $self = shift;
+    $TABLE_PREFIX = shift if (@_);
+    return ($TABLE_PREFIX);
+}
+
+sub table {
+    my $self = shift;
+    return $self->table_prefix . $self->SUPER::table();
+}
+
+
 package IPC::PubSub::Cache::JiftyDBI::Stash::ItemCollection;
 use base qw/Jifty::DBI::Collection/;
 

Modified: IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Publisher.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Publisher.pm	(original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Publisher.pm	Wed Oct 25 21:23:20 2006
@@ -3,6 +3,8 @@
 use warnings;
 use strict;
 
+use vars qw/$TABLE_PREFIX/;
+
 use Jifty::DBI::Schema;
 use Jifty::DBI::Record schema {
     column channel    => type is 'text';
@@ -11,6 +13,17 @@
 };
 
 
+sub table_prefix {
+    my $self = shift;
+    $TABLE_PREFIX = shift if (@_);
+    return ($TABLE_PREFIX);
+}
+
+sub table {
+    my $self = shift;
+    return $self->table_prefix . $self->SUPER::table();
+}
+
 package IPC::PubSub::Cache::JiftyDBI::Stash::PublisherCollection;
 use base qw/Jifty::DBI::Collection/;
 

Modified: IPC-PubSub/lib/IPC/PubSub/Cache/Memcached.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/Memcached.pm	(original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/Memcached.pm	Wed Oct 25 21:23:20 2006
@@ -1,7 +1,8 @@
 package IPC::PubSub::Cache::Memcached;
 use strict;
-use base 'IPC::PubSub::Cacheable';
+use base 'IPC::PubSub::Cache';
 use Cache::Memcached;
+use Time::HiRes ();
 
 sub new {
     my $class       = shift;
@@ -38,18 +39,14 @@
 
 sub lock {
     my ($self, $chan) = @_;
-    warn "trying to acquire lock: $chan\n";
-    for my $i (1..10) {
+    for my $i (1..100) {
         return if $$self->add("$chan#lock" => 1);
-        warn "contention: $chan\n";
-        return;
-        sleep 1;
+        Time::HiRes::usleep(rand(250000)+250000);
     }
 }
 
 sub unlock {
     my ($self, $chan) = @_;
-    warn "trying to release lock: $chan\n";
     $$self->delete("$chan#lock");
 }
 

Modified: IPC-PubSub/lib/IPC/PubSub/Cache/PlainHash.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/PlainHash.pm	(original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/PlainHash.pm	Wed Oct 25 21:23:20 2006
@@ -1,6 +1,6 @@
 package IPC::PubSub::Cache::PlainHash;
 use strict;
-use base 'IPC::PubSub::Cacheable';
+use base 'IPC::PubSub::Cache';
 
 my %cache;
 

Modified: IPC-PubSub/lib/IPC/PubSub/Cacheable.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cacheable.pm	(original)
+++ IPC-PubSub/lib/IPC/PubSub/Cacheable.pm	Wed Oct 25 21:23:20 2006
@@ -1,60 +1,41 @@
 package IPC::PubSub::Cacheable;
 use strict;
-use Time::HiRes 'time';
-use File::Spec;
+use Scalar::Util qw( refaddr );
 
-#method fetch                (Str *@keys --> List of Pair)                   { ... }
-#method store                (Str $key, Str $val, Num $time, Num $expiry)    { ... }
-
-#method add_publisher        (Str $chan, Str $pub)                           { ... }
-#method remove_publisher     (Str $chan, Str $pub)                           { ... }
-
-#method get_index            (Str $chan, Str $pub --> Int)                   { ... }
-#method set_index            (Str $chan, Str $pub, Int $index)               { ... }
-
-#method publisher_indices    (Str $chan --> Hash of Int)                     { ... }
-
-sub get {
-    my ($self, $chan, $orig, $curr) = @_;
-
-    no warnings 'uninitialized';
-    sort { $a->[0] <=> $b->[0] } $self->fetch(
-        map {
-            my $pub = $_;
-            my $index = $curr->{$pub};
-            map {
-                "$chan-$pub-$_"
-            } (($orig->{$pub}+1) .. $index);
-        } keys(%$curr)
-    );
+my %Cache;
+sub new {
+    my $class = shift;
+    my $self  = bless(\@_, $class);
+    $self->BUILD;
+    return $self;
 }
 
-sub put {
-    my ($self, $chan, $pub, $index, $msg, $expiry) = @_;
-    $self->store("$chan-$pub-$index", $msg, time, $expiry);
-    $self->set_index($chan, $pub, $index);
+sub BUILD {
+    my $self    = shift;
+    $Cache{ refaddr($self) } ||= do {
+        require "IPC/PubSub/Cache/$self->[0].pm";
+        "IPC::PubSub::Cache::$self->[0]"->new(@{$self->[1]});
+    };
 }
 
+sub AUTOLOAD {
+    no strict 'refs';
+    no warnings 'uninitialized';
 
-use constant LOCK => File::Spec->catdir(File::Spec->tmpdir, 'IPC::PubSub-lock-');
-
-my %locks;
-sub lock {
-    my ($self, $chan) = @_;
-    for my $i (1..10) {
-        return if mkdir((LOCK . unpack("H*", $chan)), 0777);
-        sleep 1;
-    }
-}
-
-END {
-    rmdir(LOCK . unpack("H*", $_)) for keys %locks;
+    my $meth    = (substr(our $AUTOLOAD, rindex($AUTOLOAD, '::') + 2) || $AUTOLOAD);
+    my $code    = sub {
+        my $self    = shift;
+        my $cache   = $self->BUILD;
+        unshift @_, $cache;
+        goto &{$cache->can($meth)};
+    };
+    *$meth = $code;
+    goto &$code;
 }
 
-sub unlock {
-    my ($self, $chan) = @_;
-    rmdir(LOCK . unpack("H*", $chan));
-    delete $locks{$chan};
+sub DESTROY {
+    my $self = shift;
+    delete $Cache{ refaddr($self) };
 }
 
 1;


More information about the Rt-commit mailing list