[Rt-commit] r6301 - in IPC-PubSub: lib/IPC lib/IPC/PubSub
lib/IPC/PubSub/Cache lib/IPC/PubSub/Cache/JiftyDBI
lib/IPC/PubSub/Cache/JiftyDBI/Stash
audreyt at bestpractical.com
audreyt at bestpractical.com
Wed Oct 25 21:23:21 EDT 2006
Author: audreyt
Date: Wed Oct 25 21:23:20 2006
New Revision: 6301
Added:
IPC-PubSub/lib/IPC/PubSub/Cache.pm
Removed:
IPC-PubSub/IPC-PubSub-0.10.tar.gz
Modified:
IPC-PubSub/Changes
IPC-PubSub/MANIFEST
IPC-PubSub/META.yml
IPC-PubSub/Makefile
IPC-PubSub/Makefile.PL
IPC-PubSub/lib/IPC/PubSub.pm
IPC-PubSub/lib/IPC/PubSub/Cache/DBM_Deep.pm
IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI.pm
IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash.pm
IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Item.pm
IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Publisher.pm
IPC-PubSub/lib/IPC/PubSub/Cache/Memcached.pm
IPC-PubSub/lib/IPC/PubSub/Cache/PlainHash.pm
IPC-PubSub/lib/IPC/PubSub/Cacheable.pm
Log:
* This be 0.20.
Modified: IPC-PubSub/Changes
==============================================================================
--- IPC-PubSub/Changes (original)
+++ IPC-PubSub/Changes Wed Oct 25 21:23:20 2006
@@ -1,3 +1,9 @@
+[Changes for 0.20 - 2006-10-25]
+
+* Memcached: Remove the debug messages accidentally left in ->lock and ->unlock.
+* New ->modify API for IPC::PubSub and Cache to atomically manipulate cache.
+* Time::HiRes is now required to reduce locking contention.
+
[Changes for 0.11 - 2006-10-25]
* The Memcached backend now takes a namespace parameter to avoid collision.
Modified: IPC-PubSub/MANIFEST
==============================================================================
--- IPC-PubSub/MANIFEST (original)
+++ IPC-PubSub/MANIFEST Wed Oct 25 21:23:20 2006
@@ -8,6 +8,7 @@
inc/Module/Install/Win32.pm
inc/Module/Install/WriteAll.pm
lib/IPC/PubSub.pm
+lib/IPC/PubSub/Cache.pm
lib/IPC/PubSub/Cache/DBM_Deep.pm
lib/IPC/PubSub/Cache/JiftyDBI.pm
lib/IPC/PubSub/Cache/JiftyDBI/Stash.pm
@@ -20,5 +21,6 @@
lib/IPC/PubSub/Subscriber.pm
Makefile.PL
MANIFEST This list of files
+META.yml
README
t/basic.t
Modified: IPC-PubSub/META.yml
==============================================================================
--- IPC-PubSub/META.yml (original)
+++ IPC-PubSub/META.yml Wed Oct 25 21:23:20 2006
@@ -12,5 +12,6 @@
Class::Accessor::Fast: 0
DBM::Deep: 0
Data::UUID: 0
- perl: 5.005
-version: 0.11
+ Time::HiRes: 0
+ perl: 5.6.0
+version: 0.20
Modified: IPC-PubSub/Makefile
==============================================================================
--- IPC-PubSub/Makefile (original)
+++ IPC-PubSub/Makefile Wed Oct 25 21:23:20 2006
@@ -17,9 +17,9 @@
# NAME => q[IPC::PubSub]
# NO_META => q[1]
# PL_FILES => { }
-# PREREQ_PM => { DBM::Deep=>q[0], Class::Accessor::Fast=>q[0], Data::UUID=>q[0] }
+# PREREQ_PM => { DBM::Deep=>q[0], Time::HiRes=>q[0], Class::Accessor::Fast=>q[0], Data::UUID=>q[0] }
# SIGN => q[1]
-# VERSION => q[0.11]
+# VERSION => q[0.20]
# dist => { PREOP=>q[$(PERL) -I. -MModule::Install::Admin -e "dist_preop(q($(DISTVNAME)))"] }
# --- MakeMaker post_initialize section:
@@ -60,11 +60,11 @@
DFSEP = $(DIRFILESEP)
NAME = IPC::PubSub
NAME_SYM = IPC_PubSub
-VERSION = 0.11
+VERSION = 0.20
VERSION_MACRO = VERSION
-VERSION_SYM = 0_11
+VERSION_SYM = 0_20
DEFINE_VERSION = -D$(VERSION_MACRO)=\"$(VERSION)\"
-XS_VERSION = 0.11
+XS_VERSION = 0.20
XS_VERSION_MACRO = XS_VERSION
XS_DEFINE_VERSION = -D$(XS_VERSION_MACRO)=\"$(XS_VERSION)\"
INST_ARCHLIB = blib/arch
@@ -181,6 +181,7 @@
TO_INST_PM = lib/IPC/PubSub.pm \
+ lib/IPC/PubSub/Cache.pm \
lib/IPC/PubSub/Cache/DBM_Deep.pm \
lib/IPC/PubSub/Cache/JiftyDBI.pm \
lib/IPC/PubSub/Cache/JiftyDBI/Stash.pm \
@@ -200,6 +201,8 @@
blib/lib/IPC/PubSub.pm \
lib/IPC/PubSub/Cache/Memcached.pm \
blib/lib/IPC/PubSub/Cache/Memcached.pm \
+ lib/IPC/PubSub/Cache.pm \
+ blib/lib/IPC/PubSub/Cache.pm \
lib/IPC/PubSub/Cache/PlainHash.pm \
blib/lib/IPC/PubSub/Cache/PlainHash.pm \
lib/IPC/PubSub/Subscriber.pm \
@@ -280,7 +283,7 @@
DIST_CP = best
DIST_DEFAULT = tardist
DISTNAME = IPC-PubSub
-DISTVNAME = IPC-PubSub-0.11
+DISTVNAME = IPC-PubSub-0.20
# --- MakeMaker macro section:
@@ -785,7 +788,7 @@
# --- MakeMaker ppd section:
# Creates a PPD (Perl Package Description) for a binary distribution.
ppd:
- $(NOECHO) $(ECHO) '<SOFTPKG NAME="$(DISTNAME)" VERSION="0,11,0,0">' > $(DISTNAME).ppd
+ $(NOECHO) $(ECHO) '<SOFTPKG NAME="$(DISTNAME)" VERSION="0,20,0,0">' > $(DISTNAME).ppd
$(NOECHO) $(ECHO) ' <TITLE>$(DISTNAME)</TITLE>' >> $(DISTNAME).ppd
$(NOECHO) $(ECHO) ' <ABSTRACT>Interprocess Publish/Subscribe channels</ABSTRACT>' >> $(DISTNAME).ppd
$(NOECHO) $(ECHO) ' <AUTHOR>Audrey Tang <cpan at audreyt.org></AUTHOR>' >> $(DISTNAME).ppd
@@ -793,6 +796,7 @@
$(NOECHO) $(ECHO) ' <DEPENDENCY NAME="Class-Accessor-Fast" VERSION="0,0,0,0" />' >> $(DISTNAME).ppd
$(NOECHO) $(ECHO) ' <DEPENDENCY NAME="DBM-Deep" VERSION="0,0,0,0" />' >> $(DISTNAME).ppd
$(NOECHO) $(ECHO) ' <DEPENDENCY NAME="Data-UUID" VERSION="0,0,0,0" />' >> $(DISTNAME).ppd
+ $(NOECHO) $(ECHO) ' <DEPENDENCY NAME="Time-HiRes" VERSION="0,0,0,0" />' >> $(DISTNAME).ppd
$(NOECHO) $(ECHO) ' <OS NAME="$(OSNAME)" />' >> $(DISTNAME).ppd
$(NOECHO) $(ECHO) ' <ARCHITECTURE NAME="darwin-2level" />' >> $(DISTNAME).ppd
$(NOECHO) $(ECHO) ' <CODEBASE HREF="" />' >> $(DISTNAME).ppd
@@ -808,6 +812,7 @@
lib/IPC/PubSub/Cache/DBM_Deep.pm blib/lib/IPC/PubSub/Cache/DBM_Deep.pm \
lib/IPC/PubSub.pm blib/lib/IPC/PubSub.pm \
lib/IPC/PubSub/Cache/Memcached.pm blib/lib/IPC/PubSub/Cache/Memcached.pm \
+ lib/IPC/PubSub/Cache.pm blib/lib/IPC/PubSub/Cache.pm \
lib/IPC/PubSub/Cache/PlainHash.pm blib/lib/IPC/PubSub/Cache/PlainHash.pm \
lib/IPC/PubSub/Subscriber.pm blib/lib/IPC/PubSub/Subscriber.pm \
lib/IPC/PubSub/Cache/JiftyDBI/Stash/Publisher.pm blib/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Publisher.pm \
Modified: IPC-PubSub/Makefile.PL
==============================================================================
--- IPC-PubSub/Makefile.PL (original)
+++ IPC-PubSub/Makefile.PL Wed Oct 25 21:23:20 2006
@@ -5,6 +5,7 @@
license 'MIT';
all_from 'lib/IPC/PubSub.pm';
+requires 'Time::HiRes';
requires 'DBM::Deep';
requires 'Data::UUID';
requires 'Class::Accessor::Fast';
Modified: IPC-PubSub/lib/IPC/PubSub.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub.pm Wed Oct 25 21:23:20 2006
@@ -1,8 +1,9 @@
package IPC::PubSub;
-$IPC::PubSub::VERSION = '0.11';
+$IPC::PubSub::VERSION = '0.20';
-use 5.005;
+use 5.006;
use strict;
+use IPC::PubSub::Cacheable;
use IPC::PubSub::Publisher;
use IPC::PubSub::Subscriber;
use base qw/Class::Accessor::Fast/;
@@ -18,7 +19,7 @@
eval { require "IPC/PubSub/Cache/$backend.pm" }
or die "Cannot load backend module: IPC::PubSub::Cache::$backend: $@";
- $self->_cache("IPC::PubSub::Cache::$backend"->new(@_));
+ $self->_cache(IPC::PubSub::Cacheable->new($backend => \@_));
return $self;
}
@@ -36,6 +37,7 @@
sub store { (+shift)->_cache->store(@_) }
sub lock { (+shift)->_cache->lock(@_) }
sub unlock { (+shift)->_cache->unlock(@_) }
+sub modify { (+shift)->_cache->modify(@_) }
1;
Added: IPC-PubSub/lib/IPC/PubSub/Cache.pm
==============================================================================
--- (empty file)
+++ IPC-PubSub/lib/IPC/PubSub/Cache.pm Wed Oct 25 21:23:20 2006
@@ -0,0 +1,79 @@
+package IPC::PubSub::Cache;
+
+use strict;
+use File::Spec;
+use Time::HiRes ();
+#method fetch (Str *@keys --> List of Pair) { ... }
+#method store (Str $key, Str $val, Num $time, Num $expiry) { ... }
+
+#method add_publisher (Str $chan, Str $pub) { ... }
+#method remove_publisher (Str $chan, Str $pub) { ... }
+
+#method get_index (Str $chan, Str $pub --> Int) { ... }
+#method set_index (Str $chan, Str $pub, Int $index) { ... }
+
+#method publisher_indices (Str $chan --> Hash of Int) { ... }
+
+sub modify {
+ my $self = shift;
+ my $key = shift;
+ return (($self->fetch("data-$key"))[0] || [])->[-1] unless @_;
+
+ if (ref($_[0]) eq 'CODE') {
+ $self->lock("lock-$key");
+ local $_ = (($self->fetch("data-$key"))[0] || [])->[-1];
+ my $rv = $_[0]->();
+ $self->store("data-$key" => $_);
+ $self->unlock("lock-$key");
+ return $rv;
+ }
+ else {
+ $self->store("data-$key" => $_[0]);
+ return $_[0]
+ }
+}
+
+sub get {
+ my ($self, $chan, $orig, $curr) = @_;
+
+ no warnings 'uninitialized';
+ sort { $a->[0] <=> $b->[0] } $self->fetch(
+ map {
+ my $pub = $_;
+ my $index = $curr->{$pub};
+ map {
+ "$chan-$pub-$_"
+ } (($orig->{$pub}+1) .. $index);
+ } keys(%$curr)
+ );
+}
+
+sub put {
+ my ($self, $chan, $pub, $index, $msg, $expiry) = @_;
+ $self->store("$chan-$pub-$index", $msg, Time::HiRes::time(), $expiry);
+ $self->set_index($chan, $pub, $index);
+}
+
+
+use constant LOCK => File::Spec->catdir(File::Spec->tmpdir, 'IPC::PubSub-lock-');
+
+my %locks;
+sub lock {
+ my ($self, $chan) = @_;
+ for my $i (1..10) {
+ return if mkdir((LOCK . unpack("H*", $chan)), 0777);
+ Time::HiRes::usleep(rand(250000)+250000);
+ }
+}
+
+END {
+ rmdir(LOCK . unpack("H*", $_)) for keys %locks;
+}
+
+sub unlock {
+ my ($self, $chan) = @_;
+ rmdir(LOCK . unpack("H*", $chan));
+ delete $locks{$chan};
+}
+
+1;
Modified: IPC-PubSub/lib/IPC/PubSub/Cache/DBM_Deep.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/DBM_Deep.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/DBM_Deep.pm Wed Oct 25 21:23:20 2006
@@ -1,6 +1,6 @@
package IPC::PubSub::Cache::DBM_Deep;
use strict;
-use base 'IPC::PubSub::Cacheable';
+use base 'IPC::PubSub::Cache';
use DBM::Deep;
use File::Temp qw/ tempfile /;
Modified: IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI.pm Wed Oct 25 21:23:20 2006
@@ -1,6 +1,6 @@
package IPC::PubSub::Cache::JiftyDBI;
use strict;
-use base 'IPC::PubSub::Cacheable';
+use base 'IPC::PubSub::Cache';
use IPC::PubSub::Cache::JiftyDBI::Stash;
my %cache;
@@ -37,7 +37,15 @@
my ($self, $key, $val, $time, $expiry) = @_;
$expiry ||= 0;
my $item = IPC::PubSub::Cache::JiftyDBI::Stash::Item->new(handle => $STASH->handle);
- $item->create( key => $key, expiry => ($time+$expiry), val => $val);
+
+ $item->load_by_cols( key => $key );
+ if ( $item->id ) {
+ $item->set_val($val);
+ $item->set_expiry($time+$expiry);
+ }
+ else {
+ $item->create( key => $key, expiry => ($time+$expiry), val => $val );
+ }
}
sub publisher_indices {
Modified: IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash.pm Wed Oct 25 21:23:20 2006
@@ -18,8 +18,21 @@
my %args = (
db_init => 0,
db_config => undef,
+ db_handle => undef,
+ table_prefix => 'pubsub_',
@_
);
+
+ if ($args{'table_prefix'}) {
+
+ IPC::PubSub::Cache::JiftyDBI::Stash::Item->table_prefix($args{'table_prefix'});
+ IPC::PubSub::Cache::JiftyDBI::Stash::Publisher->table_prefix($args{'table_prefix'});
+ }
+
+ if ($args{'db_handle'}) {
+ $self->handle($args{'db_handle'});
+
+ } else {
unless ( $args{'db_config'} ) {
my $filename;
( undef, $filename ) = tempfile();
@@ -28,10 +41,12 @@
}
$self->_connect( %{$args{'db_config'}} );
+
+ }
if ( $args{'db_init'} ) {
$self->_generate_db();
}
- $self;
+ return $self;
}
sub handle {
Modified: IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Item.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Item.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Item.pm Wed Oct 25 21:23:20 2006
@@ -3,6 +3,8 @@
use warnings;
use strict;
+use vars qw/$TABLE_PREFIX/;
+
use Jifty::DBI::Schema;
use Jifty::DBI::Record schema {
column key => type is 'text';
@@ -10,6 +12,19 @@
column expiry => type is 'int';
};
+
+sub table_prefix {
+ my $self = shift;
+ $TABLE_PREFIX = shift if (@_);
+ return ($TABLE_PREFIX);
+}
+
+sub table {
+ my $self = shift;
+ return $self->table_prefix . $self->SUPER::table();
+}
+
+
package IPC::PubSub::Cache::JiftyDBI::Stash::ItemCollection;
use base qw/Jifty::DBI::Collection/;
Modified: IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Publisher.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Publisher.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/JiftyDBI/Stash/Publisher.pm Wed Oct 25 21:23:20 2006
@@ -3,6 +3,8 @@
use warnings;
use strict;
+use vars qw/$TABLE_PREFIX/;
+
use Jifty::DBI::Schema;
use Jifty::DBI::Record schema {
column channel => type is 'text';
@@ -11,6 +13,17 @@
};
+sub table_prefix {
+ my $self = shift;
+ $TABLE_PREFIX = shift if (@_);
+ return ($TABLE_PREFIX);
+}
+
+sub table {
+ my $self = shift;
+ return $self->table_prefix . $self->SUPER::table();
+}
+
package IPC::PubSub::Cache::JiftyDBI::Stash::PublisherCollection;
use base qw/Jifty::DBI::Collection/;
Modified: IPC-PubSub/lib/IPC/PubSub/Cache/Memcached.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/Memcached.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/Memcached.pm Wed Oct 25 21:23:20 2006
@@ -1,7 +1,8 @@
package IPC::PubSub::Cache::Memcached;
use strict;
-use base 'IPC::PubSub::Cacheable';
+use base 'IPC::PubSub::Cache';
use Cache::Memcached;
+use Time::HiRes ();
sub new {
my $class = shift;
@@ -38,18 +39,14 @@
sub lock {
my ($self, $chan) = @_;
- warn "trying to acquire lock: $chan\n";
- for my $i (1..10) {
+ for my $i (1..100) {
return if $$self->add("$chan#lock" => 1);
- warn "contention: $chan\n";
- return;
- sleep 1;
+ Time::HiRes::usleep(rand(250000)+250000);
}
}
sub unlock {
my ($self, $chan) = @_;
- warn "trying to release lock: $chan\n";
$$self->delete("$chan#lock");
}
Modified: IPC-PubSub/lib/IPC/PubSub/Cache/PlainHash.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/PlainHash.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/PlainHash.pm Wed Oct 25 21:23:20 2006
@@ -1,6 +1,6 @@
package IPC::PubSub::Cache::PlainHash;
use strict;
-use base 'IPC::PubSub::Cacheable';
+use base 'IPC::PubSub::Cache';
my %cache;
Modified: IPC-PubSub/lib/IPC/PubSub/Cacheable.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cacheable.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub/Cacheable.pm Wed Oct 25 21:23:20 2006
@@ -1,60 +1,41 @@
package IPC::PubSub::Cacheable;
use strict;
-use Time::HiRes 'time';
-use File::Spec;
+use Scalar::Util qw( refaddr );
-#method fetch (Str *@keys --> List of Pair) { ... }
-#method store (Str $key, Str $val, Num $time, Num $expiry) { ... }
-
-#method add_publisher (Str $chan, Str $pub) { ... }
-#method remove_publisher (Str $chan, Str $pub) { ... }
-
-#method get_index (Str $chan, Str $pub --> Int) { ... }
-#method set_index (Str $chan, Str $pub, Int $index) { ... }
-
-#method publisher_indices (Str $chan --> Hash of Int) { ... }
-
-sub get {
- my ($self, $chan, $orig, $curr) = @_;
-
- no warnings 'uninitialized';
- sort { $a->[0] <=> $b->[0] } $self->fetch(
- map {
- my $pub = $_;
- my $index = $curr->{$pub};
- map {
- "$chan-$pub-$_"
- } (($orig->{$pub}+1) .. $index);
- } keys(%$curr)
- );
+my %Cache;
+sub new {
+ my $class = shift;
+ my $self = bless(\@_, $class);
+ $self->BUILD;
+ return $self;
}
-sub put {
- my ($self, $chan, $pub, $index, $msg, $expiry) = @_;
- $self->store("$chan-$pub-$index", $msg, time, $expiry);
- $self->set_index($chan, $pub, $index);
+sub BUILD {
+ my $self = shift;
+ $Cache{ refaddr($self) } ||= do {
+ require "IPC/PubSub/Cache/$self->[0].pm";
+ "IPC::PubSub::Cache::$self->[0]"->new(@{$self->[1]});
+ };
}
+sub AUTOLOAD {
+ no strict 'refs';
+ no warnings 'uninitialized';
-use constant LOCK => File::Spec->catdir(File::Spec->tmpdir, 'IPC::PubSub-lock-');
-
-my %locks;
-sub lock {
- my ($self, $chan) = @_;
- for my $i (1..10) {
- return if mkdir((LOCK . unpack("H*", $chan)), 0777);
- sleep 1;
- }
-}
-
-END {
- rmdir(LOCK . unpack("H*", $_)) for keys %locks;
+ my $meth = (substr(our $AUTOLOAD, rindex($AUTOLOAD, '::') + 2) || $AUTOLOAD);
+ my $code = sub {
+ my $self = shift;
+ my $cache = $self->BUILD;
+ unshift @_, $cache;
+ goto &{$cache->can($meth)};
+ };
+ *$meth = $code;
+ goto &$code;
}
-sub unlock {
- my ($self, $chan) = @_;
- rmdir(LOCK . unpack("H*", $chan));
- delete $locks{$chan};
+sub DESTROY {
+ my $self = shift;
+ delete $Cache{ refaddr($self) };
}
1;
More information about the Rt-commit
mailing list