[Rt-commit] r6303 - in IPC-PubSub: lib/IPC/PubSub
lib/IPC/PubSub/Cache
audreyt at bestpractical.com
audreyt at bestpractical.com
Thu Oct 26 16:02:33 EDT 2006
Author: audreyt
Date: Thu Oct 26 16:02:32 2006
New Revision: 6303
Modified:
IPC-PubSub/lib/IPC/PubSub/Cache.pm
IPC-PubSub/lib/IPC/PubSub/Cache/DBM_Deep.pm
IPC-PubSub/lib/IPC/PubSub/Cache/Memcached.pm
IPC-PubSub/t/basic.t
Log:
* Speed up on ->modify calls; add tests for ->modify.
Modified: IPC-PubSub/lib/IPC/PubSub/Cache.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache.pm Thu Oct 26 16:02:32 2006
@@ -3,6 +3,7 @@
use strict;
use File::Spec;
use Time::HiRes ();
+
#method fetch (Str *@keys --> List of Pair) { ... }
#method store (Str $key, Str $val, Num $time, Num $expiry) { ... }
@@ -14,22 +15,37 @@
#method publisher_indices (Str $chan --> Hash of Int) { ... }
+sub fetch_data {
+ my $self = shift;
+ my $key = shift;
+ return (($self->fetch("data:$key"))[0] || [])->[-1];
+}
+
+sub store_data {
+ my $self = shift;
+ my $key = shift;
+ my $val = shift;
+ $self->store("data:$key" => $val, -1, 0);
+}
+
sub modify {
my $self = shift;
my $key = shift;
- return (($self->fetch("data-$key"))[0] || [])->[-1] unless @_;
+ return $self->fetch_data($key) unless @_;
+
+ my $with = shift;
- if (ref($_[0]) eq 'CODE') {
- $self->lock("lock-$key");
- local $_ = (($self->fetch("data-$key"))[0] || [])->[-1];
- my $rv = $_[0]->();
- $self->store("data-$key" => $_);
- $self->unlock("lock-$key");
+ if (ref($with) eq 'CODE') {
+ $self->lock("data:$key");
+ local $_ = $self->fetch_data($key);
+ my $rv = $with->();
+ $self->store_data($key => $_);
+ $self->unlock("data:$key");
return $rv;
}
else {
- $self->store("data-$key" => $_[0]);
- return $_[0]
+ $self->store_data($key => $with);
+ return $with;
}
}
@@ -42,7 +58,7 @@
my $pub = $_;
my $index = $curr->{$pub};
map {
- "$chan-$pub-$_"
+ "chan:$chan-$pub$_"
} (($orig->{$pub}+1) .. $index);
} keys(%$curr)
);
@@ -50,12 +66,12 @@
sub put {
my ($self, $chan, $pub, $index, $msg, $expiry) = @_;
- $self->store("$chan-$pub-$index", $msg, Time::HiRes::time(), $expiry);
+ $self->store("chan:$chan-$pub$index", $msg, Time::HiRes::time(), $expiry);
$self->set_index($chan, $pub, $index);
}
-use constant LOCK => File::Spec->catdir(File::Spec->tmpdir, 'IPC::PubSub-lock-');
+use constant LOCK => File::Spec->catdir(File::Spec->tmpdir, 'IPC-PubSub-lock-');
my %locks;
sub lock {
Modified: IPC-PubSub/lib/IPC/PubSub/Cache/DBM_Deep.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/DBM_Deep.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/DBM_Deep.pm Thu Oct 26 16:02:32 2006
@@ -28,33 +28,33 @@
sub publisher_indices {
my ($self, $chan) = @_;
- return { %{ $$self->get("$chan#") || {} } };
+ return { %{ $$self->get("pubs:$chan") || {} } };
}
sub add_publisher {
my ($self, $chan, $pub) = @_;
- my $pubs = { %{ $$self->get("$chan#") || {} } };
+ my $pubs = { %{ $$self->get("pubs:$chan") || {} } };
$pubs->{$pub} = 0;
- $$self->put("$chan#", $pubs);
+ $$self->put("pubs:$chan", $pubs);
}
sub remove_publisher {
my ($self, $chan, $pub) = @_;
- my $pubs = { %{ $$self->get("$chan#") || {} } };
+ my $pubs = { %{ $$self->get("pubs:$chan") || {} } };
delete $pubs->{$pub};
- $$self->put("$chan#", $pubs);
+ $$self->put("pubs:$chan", $pubs);
}
sub get_index {
my ($self, $chan, $pub) = @_;
- ($$self->get("$chan#") || {})->{$pub};
+ ($$self->get("pubs:$chan") || {})->{$pub};
}
sub set_index {
my ($self, $chan, $pub, $idx) = @_;
- my $pubs = { %{ $$self->get("$chan#") || {} } };
+ my $pubs = { %{ $$self->get("pubs:$chan") || {} } };
$pubs->{$pub} = $idx;
- $$self->put("$chan#", $pubs);
+ $$self->put("pubs:$chan", $pubs);
}
1;
Modified: IPC-PubSub/lib/IPC/PubSub/Cache/Memcached.pm
==============================================================================
--- IPC-PubSub/lib/IPC/PubSub/Cache/Memcached.pm (original)
+++ IPC-PubSub/lib/IPC/PubSub/Cache/Memcached.pm Thu Oct 26 16:02:32 2006
@@ -21,9 +21,21 @@
};
}
+sub fetch_data {
+ my $self = shift;
+ my $key = shift;
+ return $$self->get("data:$key");
+}
+
+sub store_data {
+ my $self = shift;
+ my $key = shift;
+ my $val = shift;
+ return $$self->set("data:$key" => $val);
+}
+
sub fetch {
my $self = shift;
- die "Rejecting insanity" if @_ > 100;
values(%{$$self->get_multi(@_)});
}
@@ -34,50 +46,52 @@
sub publisher_indices {
my ($self, $chan) = @_;
- $$self->get("$chan#") || {};
+ $$self->get("pubs:$chan") || {};
}
sub lock {
- my ($self, $chan) = @_;
+ my ($self, $key) = @_;
for my $i (1..100) {
- return if $$self->add("$chan#lock" => 1);
+ return if $$self->add("lock:$key" => 1);
Time::HiRes::usleep(rand(250000)+250000);
}
}
sub unlock {
my ($self, $chan) = @_;
- $$self->delete("$chan#lock");
+ $$self->delete("lock:$chan");
}
sub add_publisher {
my ($self, $chan, $pub) = @_;
- $self->lock($chan);
- my $pubs = $$self->get("$chan#") || {};
+ my $key = "pubs:$chan";
+ $self->lock($key);
+ my $pubs = $$self->get($key) || {};
$pubs->{$pub} = 0;
- $$self->set("$chan#", $pubs);
- $self->unlock($chan);
+ $$self->set($key => $pubs);
+ $self->unlock($key);
}
sub remove_publisher {
my ($self, $chan, $pub) = @_;
- $self->lock($chan);
- my $pubs = $$self->get("$chan#") || {};
+ my $key = "pubs:$chan";
+ $self->lock($key);
+ my $pubs = $$self->get($key) || {};
delete $pubs->{$pub};
- $$self->set("$chan#", $pubs);
- $self->unlock($chan);
+ $$self->set($key => $pubs);
+ $self->unlock($key);
}
sub get_index {
my ($self, $chan, $pub) = @_;
- ($$self->get("$chan#") || {})->{$pub};
+ ($$self->get("pubs:$chan") || {})->{$pub};
}
sub set_index {
my ($self, $chan, $pub, $idx) = @_;
- my $pubs = $$self->get("$chan#") || {};
+ my $pubs = $$self->get("pubs:$chan") || {};
$pubs->{$pub} = $idx;
- $$self->set("$chan#", $pubs);
+ $$self->set("pubs:$chan", $pubs);
}
1;
Modified: IPC-PubSub/t/basic.t
==============================================================================
--- IPC-PubSub/t/basic.t (original)
+++ IPC-PubSub/t/basic.t Thu Oct 26 16:02:32 2006
@@ -10,16 +10,17 @@
unshift @backends, 'JiftyDBI' if eval { require Jifty::DBI };
unshift @backends, 'Memcached' if eval { require Cache::Memcached } and IO::Socket::INET->new('127.0.0.1:11211');
-plan tests => 6 * scalar @backends;
+plan tests => 12 * scalar @backends;
+
+my %init_args = (
+ JiftyDBI => [ db_init => 1 ],
+ Memcached => [ rand() . $$ ],
+);
SKIP: for my $backend (@backends) {
- my %init_args = ();
- diag('Testing backend '.$backend);
- if ($backend eq 'JiftyDBI') {
- $init_args{db_init} = 1;
- }
+ diag("Testing backend $backend");
- my $bus = IPC::PubSub->new($backend, %init_args);
+ my $bus = IPC::PubSub->new($backend, @{$init_args{$backend}});
my @sub; $sub[0] = $bus->new_subscriber;
@@ -40,4 +41,11 @@
is_deeply([map {$_->[1]} @{$sub[1]->get_all->{''}}], [['bar', 'bar'], 'baz'], 'get_all worked');
is_deeply([map {$_->[1]} @{$sub[1]->get_all->{''}}], [], 'get_all emptied the cache');
+
+ is($bus->modify('key'), undef, 'modify (1)');
+ is($bus->modify('key' => 'val'), 'val', 'modify (2)');
+ is($bus->modify('key'), 'val', 'modify (3)');
+ is($bus->modify('key' => sub { s/v/h/ }), 1, 'modify (4)');
+ is($bus->modify('key'), 'hal', 'modify (5)');
+ is($bus->modify('key' => undef), undef, 'modify (6)');
}
More information about the Rt-commit
mailing list