[Rt-commit] r6266 - in MessageBus: . lib lib/MessageBus/Cache
lib/MessageBus/Cache/JiftyDBI/Stash t
audreyt at bestpractical.com
audreyt at bestpractical.com
Tue Oct 24 22:10:09 EDT 2006
Author: audreyt
Date: Tue Oct 24 22:10:08 2006
New Revision: 6266
Added:
MessageBus/lib/MessageBus/Cache/JiftyDBI/
MessageBus/lib/MessageBus/Cache/JiftyDBI.pm
MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/
MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash.pm
MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/Item.pm
MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/Publisher.pm
Modified:
MessageBus/Changes
MessageBus/lib/MessageBus.pm
MessageBus/lib/MessageBus/Cache/DBM_Deep.pm
MessageBus/lib/MessageBus/Cache/Memcached.pm
MessageBus/lib/MessageBus/Cache/PlainHash.pm
MessageBus/t/basic.t
Log:
* 0.03
Modified: MessageBus/Changes
==============================================================================
--- MessageBus/Changes (original)
+++ MessageBus/Changes Tue Oct 24 22:10:08 2006
@@ -1,3 +1,9 @@
+[Changes for 0.03 - 2006-10-24]
+
+* New backend: JiftyDBI.
+* Multiple publishers now work in DBM_Deep and Memcached backends.
+* Memcached now atomically handles publisher announcement and removal.
+
[Changes for 0.02 - 2006-10-24]
* Thanks to mstrout++ this thing actually works now. :-)
Modified: MessageBus/lib/MessageBus.pm
==============================================================================
--- MessageBus/lib/MessageBus.pm (original)
+++ MessageBus/lib/MessageBus.pm Tue Oct 24 22:10:08 2006
@@ -1,5 +1,5 @@
package MessageBus;
-$MessageBus::VERSION = '0.02';
+$MessageBus::VERSION = '0.03';
use 5.005;
use strict;
Modified: MessageBus/lib/MessageBus/Cache/DBM_Deep.pm
==============================================================================
--- MessageBus/lib/MessageBus/Cache/DBM_Deep.pm (original)
+++ MessageBus/lib/MessageBus/Cache/DBM_Deep.pm Tue Oct 24 22:10:08 2006
@@ -33,14 +33,14 @@
sub add_publisher {
my ($self, $chan, $pub) = @_;
- my $pubs = $$self->get("$chan#") || {};
+ my $pubs = { %{ $$self->get("$chan#") || {} } };
$pubs->{$pub} = 0;
$$self->put("$chan#", $pubs);
}
sub remove_publisher {
my ($self, $chan, $pub) = @_;
- my $pubs = $$self->get("$chan#") || {};
+ my $pubs = { %{ $$self->get("$chan#") || {} } };
delete $pubs->{$pub};
$$self->put("$chan#", $pubs);
}
@@ -52,7 +52,7 @@
sub set_index {
my ($self, $chan, $pub, $idx) = @_;
- my $pubs = $$self->get("$chan#") || {};
+ my $pubs = { %{ $$self->get("$chan#") || {} } };
$pubs->{$pub} = $idx;
$$self->put("$chan#", $pubs);
}
Added: MessageBus/lib/MessageBus/Cache/JiftyDBI.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/JiftyDBI.pm Tue Oct 24 22:10:08 2006
@@ -0,0 +1,90 @@
+package MessageBus::Cache::JiftyDBI;
+use strict;
+use base 'MessageBus::Cacheable';
+use MessageBus::Cache::JiftyDBI::Stash;
+my %cache;
+
+use vars qw/$STASH/;
+
+sub new {
+ my $class = shift;
+ my $self = {};
+ bless $self => $class;
+ $STASH ||= MessageBus::Cache::JiftyDBI::Stash->new(@_);
+
+ return $self;
+}
+
+
+
+sub fetch {
+ my $self = shift;
+ my @keys_in_order = (@_);
+ my $items = MessageBus::Cache::JiftyDBI::Stash::ItemCollection->new( handle => $STASH->handle );
+ foreach my $val (@keys_in_order) {
+ $items->limit(
+ column => 'key',
+ entry_aggregator => 'or',
+ value => $val
+ );
+ }
+
+ my %items = map { $_->key, [$_->expiry, $_->val] } @{ $items->items_array_ref };
+ return @items{@keys_in_order};
+}
+
+sub store {
+ my ($self, $key, $val, $time, $expiry) = @_;
+ $expiry ||= 0;
+ my $item = MessageBus::Cache::JiftyDBI::Stash::Item->new(handle => $STASH->handle);
+ $item->create( key => $key, expiry => ($time+$expiry), val => $val);
+}
+
+sub publisher_indices {
+ my ($self, $chan) = @_;
+ my $publishers = MessageBus::Cache::JiftyDBI::Stash::PublisherCollection->new(handle => $STASH->handle);
+ $publishers->limit(column => 'channel', value => $chan);
+
+ my %indices;
+ map {$indices{$_->name} = $_->idx} @{$publishers->items_array_ref};
+ return \%indices;
+
+}
+
+sub add_publisher {
+ my ($self, $chan, $pub) = @_;
+
+ my $publisher = MessageBus::Cache::JiftyDBI::Stash::Publisher->new(handle => $STASH->handle);
+ $publisher->create( channel => $chan, name => $pub, idx => 0);
+
+}
+
+sub remove_publisher {
+ my ($self, $chan, $pub) = @_;
+ my $publisher = _get_publisher($chan => $pub);
+ $publisher->delete();
+
+}
+
+sub get_index {
+ my ($self, $chan, $pub) = @_;
+ my $publisher = _get_publisher($chan => $pub);
+ if ($publisher->id) {
+ return $publisher->idx
+ }
+
+}
+
+sub set_index {
+ my ($self, $chan, $pub, $idx) = @_;
+ return _get_publisher($chan => $pub)->set_idx($idx);
+}
+
+sub _get_publisher {
+ my $chan = shift;
+ my $pub = shift;
+ my $publisher = MessageBus::Cache::JiftyDBI::Stash::Publisher->new(handle => $STASH->handle);
+ $publisher->load_by_cols( channel => $chan, name => $pub);
+ return $publisher;
+}
+1;
Added: MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash.pm Tue Oct 24 22:10:08 2006
@@ -0,0 +1,68 @@
+package MessageBus::Cache::JiftyDBI::Stash;
+use warnings;
+use strict;
+
+
+use Jifty::DBI::Handle;
+use Jifty::DBI::SchemaGenerator;
+ use File::Temp qw/ tempfile tempdir /;
+use MessageBus::Cache::JiftyDBI::Stash::Item;
+use MessageBus::Cache::JiftyDBI::Stash::Publisher;
+my $FILE= $ENV{'HOME'}. '/.messagebus.sqlite';
+
+sub new {
+ my $class = shift;
+ my $self = {};
+ bless $self, $class;
+
+ my %args = (
+ db_init => 0,
+ db_config => undef,
+ @_
+ );
+ unless ( $args{'db_config'} ) {
+ my $filename;
+ ( undef, $filename ) = tempfile();
+
+ $args{'db_config'} = { driver => 'SQLite', database => $filename };
+ }
+
+ $self->_connect( %{$args{'db_config'}} );
+ if ( $args{'db_init'} ) {
+ $self->_generate_db();
+ }
+ $self;
+}
+
+sub handle {
+ my $self = shift;
+ $self->{'handle'} = shift if (@_);
+ return $self->{'handle'};
+}
+
+sub _generate_db {
+ my $self = shift;
+ my $gen = Jifty::DBI::SchemaGenerator->new( $self->handle );
+ $gen->add_model( MessageBus::Cache::JiftyDBI::Stash::Item->new( handle => $self->handle ) );
+ $gen->add_model( MessageBus::Cache::JiftyDBI::Stash::Publisher->new( handle => $self->handle ) );
+ my @statements = $gen->create_table_sql_statements;
+ $self->handle->begin_transaction;
+ for my $statement (@statements) {
+ my $ret = $self->handle->simple_query($statement);
+ }
+ $self->handle->commit;
+
+}
+
+
+sub _connect {
+ my $self = shift;
+
+
+ my $handle = Jifty::DBI::Handle->new();
+ $handle->connect(@_);
+ $self->handle($handle);
+}
+
+
+1;
Added: MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/Item.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/Item.pm Tue Oct 24 22:10:08 2006
@@ -0,0 +1,23 @@
+package MessageBus::Cache::JiftyDBI::Stash::Item;
+
+use warnings;
+use strict;
+
+use Jifty::DBI::Schema;
+use Jifty::DBI::Record schema {
+ column key => type is 'text';
+ column val => type is 'blob', filters are 'Jifty::DBI::Filter::Storable';
+ column expiry => type is 'int';
+};
+
+package MessageBus::Cache::JiftyDBI::Stash::ItemCollection;
+use base qw/Jifty::DBI::Collection/;
+
+sub table {
+ my $self = shift;
+ my $tab = $self->new_item->table();
+ return $tab;
+}
+
+
+1;
Added: MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/Publisher.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/Publisher.pm Tue Oct 24 22:10:08 2006
@@ -0,0 +1,23 @@
+package MessageBus::Cache::JiftyDBI::Stash::Publisher;
+
+use warnings;
+use strict;
+
+use Jifty::DBI::Schema;
+use Jifty::DBI::Record schema {
+ column channel => type is 'text';
+ column name => type is 'text';
+ column idx => type is 'int';
+};
+
+
+package MessageBus::Cache::JiftyDBI::Stash::PublisherCollection;
+use base qw/Jifty::DBI::Collection/;
+
+sub table {
+ my $self = shift;
+ my $tab = $self->new_item->table();
+ return $tab;
+}
+
+1;
Modified: MessageBus/lib/MessageBus/Cache/Memcached.pm
==============================================================================
--- MessageBus/lib/MessageBus/Cache/Memcached.pm (original)
+++ MessageBus/lib/MessageBus/Cache/Memcached.pm Tue Oct 24 22:10:08 2006
@@ -32,18 +32,35 @@
$$self->get("$chan#") || {};
}
+sub lock {
+ my ($self, $chan) = @_;
+ for my $i (1..10) {
+ return if $$self->add("$chan#lock" => 1);
+ sleep 1;
+ }
+}
+
+sub unlock {
+ my ($self, $chan) = @_;
+ $$self->delete("$chan#lock");
+}
+
sub add_publisher {
my ($self, $chan, $pub) = @_;
+ $self->lock($chan);
my $pubs = $$self->get("$chan#") || {};
$pubs->{$pub} = 0;
$$self->set("$chan#", $pubs);
+ $self->unlock($chan);
}
sub remove_publisher {
my ($self, $chan, $pub) = @_;
+ $self->lock($chan);
my $pubs = $$self->get("$chan#") || {};
delete $pubs->{$pub};
$$self->set("$chan#", $pubs);
+ $self->unlock($chan);
}
sub get_index {
Modified: MessageBus/lib/MessageBus/Cache/PlainHash.pm
==============================================================================
--- MessageBus/lib/MessageBus/Cache/PlainHash.pm (original)
+++ MessageBus/lib/MessageBus/Cache/PlainHash.pm Tue Oct 24 22:10:08 2006
@@ -17,8 +17,8 @@
}
sub publisher_indices {
- my ($self, $chan) = @_;
- +{%{$cache{$chan}||{}}};
+ my ( $self, $chan ) = @_;
+ +{ %{ $cache{$chan} || {} } };
}
sub add_publisher {
Modified: MessageBus/t/basic.t
==============================================================================
--- MessageBus/t/basic.t (original)
+++ MessageBus/t/basic.t Tue Oct 24 22:10:08 2006
@@ -1,17 +1,28 @@
use strict;
-use Test::More tests => 18;
+use warnings;
+use Test::More;
use MessageBus;
use IO::Socket::INET;
-my @backends = qw(PlainHash DBM_Deep Memcached);
+my @backends = qw(PlainHash);
+
+unshift @backends, 'DBM_Deep' if eval { require DBM::Deep };
+unshift @backends, 'JiftyDBI' if eval { require Jifty::DBI };
+unshift @backends, 'Memcached' if eval { require Cache::Memcached };
+
+plan tests => 6 * scalar @backends;
SKIP: for my $backend (@backends) {
+ my %init_args = ();
+ diag('Testing backend '.$backend);
if ($backend eq 'Memcached') {
my $sock = IO::Socket::INET->new('127.0.0.1:11211')
or skip("Memcached not started", 6);
+ } elsif ($backend eq 'JiftyDBI') {
+ $init_args{db_init} = 1;
}
- my $bus = MessageBus->new($backend);
+ my $bus = MessageBus->new($backend, %init_args);
my @sub; $sub[0] = $bus->subscribe;
@@ -24,12 +35,12 @@
$sub[1] = $bus->subscribe;
- $pub->msg('bar');
+ $pub->msg(['bar', 'bar']);
$pub->msg('baz');
- is_deeply([$sub[0]->get], [qw< foo bar baz >], 'get worked');
+ is_deeply([$sub[0]->get], ['foo', ['bar', 'bar'], 'baz'], 'get worked');
is_deeply([$sub[0]->get], [], 'get emptied the cache');
- is_deeply([map {$_->[1]} @{$sub[1]->get_all->{''}}], [qw< bar baz >], 'get_all worked');
+ is_deeply([map {$_->[1]} @{$sub[1]->get_all->{''}}], [['bar', 'bar'], 'baz'], 'get_all worked');
is_deeply([map {$_->[1]} @{$sub[1]->get_all->{''}}], [], 'get_all emptied the cache');
}
More information about the Rt-commit
mailing list