[Rt-commit] r6266 - in MessageBus: . lib lib/MessageBus/Cache lib/MessageBus/Cache/JiftyDBI/Stash t

audreyt at bestpractical.com audreyt at bestpractical.com
Tue Oct 24 22:10:09 EDT 2006


Author: audreyt
Date: Tue Oct 24 22:10:08 2006
New Revision: 6266

Added:
   MessageBus/lib/MessageBus/Cache/JiftyDBI/
   MessageBus/lib/MessageBus/Cache/JiftyDBI.pm
   MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/
   MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash.pm
   MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/Item.pm
   MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/Publisher.pm
Modified:
   MessageBus/Changes
   MessageBus/lib/MessageBus.pm
   MessageBus/lib/MessageBus/Cache/DBM_Deep.pm
   MessageBus/lib/MessageBus/Cache/Memcached.pm
   MessageBus/lib/MessageBus/Cache/PlainHash.pm
   MessageBus/t/basic.t

Log:
* 0.03

Modified: MessageBus/Changes
==============================================================================
--- MessageBus/Changes	(original)
+++ MessageBus/Changes	Tue Oct 24 22:10:08 2006
@@ -1,3 +1,9 @@
+[Changes for 0.03 - 2006-10-24]
+
+* New backend: JiftyDBI.
+* Multiple publishers now work in DBM_Deep and Memcached backends.
+* Memcached now atomically handles publisher announcement and removal.
+
 [Changes for 0.02 - 2006-10-24]
 
 * Thanks to mstrout++ this thing actually works now. :-)

Modified: MessageBus/lib/MessageBus.pm
==============================================================================
--- MessageBus/lib/MessageBus.pm	(original)
+++ MessageBus/lib/MessageBus.pm	Tue Oct 24 22:10:08 2006
@@ -1,5 +1,5 @@
 package MessageBus;
-$MessageBus::VERSION = '0.02';
+$MessageBus::VERSION = '0.03';
 
 use 5.005;
 use strict;

Modified: MessageBus/lib/MessageBus/Cache/DBM_Deep.pm
==============================================================================
--- MessageBus/lib/MessageBus/Cache/DBM_Deep.pm	(original)
+++ MessageBus/lib/MessageBus/Cache/DBM_Deep.pm	Tue Oct 24 22:10:08 2006
@@ -33,14 +33,14 @@
 
 sub add_publisher {
     my ($self, $chan, $pub) = @_;
-    my $pubs = $$self->get("$chan#") || {};
+    my $pubs = { %{ $$self->get("$chan#") || {} } };
     $pubs->{$pub} = 0;
     $$self->put("$chan#", $pubs);
 }
 
 sub remove_publisher {
     my ($self, $chan, $pub) = @_;
-    my $pubs = $$self->get("$chan#") || {};
+    my $pubs = { %{ $$self->get("$chan#") || {} } };
     delete $pubs->{$pub};
     $$self->put("$chan#", $pubs);
 }
@@ -52,7 +52,7 @@
 
 sub set_index {
     my ($self, $chan, $pub, $idx) = @_;
-    my $pubs = $$self->get("$chan#") || {};
+    my $pubs = { %{ $$self->get("$chan#") || {} } };
     $pubs->{$pub} = $idx;
     $$self->put("$chan#", $pubs);
 }

Added: MessageBus/lib/MessageBus/Cache/JiftyDBI.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/JiftyDBI.pm	Tue Oct 24 22:10:08 2006
@@ -0,0 +1,90 @@
+package MessageBus::Cache::JiftyDBI;
+use strict;
+use base 'MessageBus::Cacheable';
+use MessageBus::Cache::JiftyDBI::Stash;
+my %cache;
+
+use vars qw/$STASH/;
+
+sub new {
+    my $class = shift;
+    my $self = {};
+    bless $self => $class;
+    $STASH ||= MessageBus::Cache::JiftyDBI::Stash->new(@_);
+
+   return $self; 
+}
+
+
+
+sub fetch {
+    my $self  = shift;
+    my @keys_in_order = (@_);
+    my $items = MessageBus::Cache::JiftyDBI::Stash::ItemCollection->new( handle => $STASH->handle );
+    foreach my $val (@keys_in_order) {
+        $items->limit(
+            column           => 'key',
+            entry_aggregator => 'or',
+            value            => $val
+        );
+    }
+
+    my %items = map { $_->key, [$_->expiry, $_->val] } @{ $items->items_array_ref };
+    return @items{@keys_in_order};
+}
+
+sub store {
+    my ($self, $key, $val, $time, $expiry) = @_;
+    $expiry ||= 0;
+    my $item = MessageBus::Cache::JiftyDBI::Stash::Item->new(handle => $STASH->handle);
+    $item->create( key => $key, expiry => ($time+$expiry), val => $val);
+}
+
+sub publisher_indices {
+    my ($self, $chan) = @_;
+    my $publishers = MessageBus::Cache::JiftyDBI::Stash::PublisherCollection->new(handle => $STASH->handle);
+    $publishers->limit(column => 'channel', value => $chan);
+    
+    my %indices;
+    map {$indices{$_->name} = $_->idx} @{$publishers->items_array_ref};
+    return \%indices;
+
+}
+
+sub add_publisher {
+    my ($self, $chan, $pub) = @_;
+
+    my $publisher = MessageBus::Cache::JiftyDBI::Stash::Publisher->new(handle => $STASH->handle);
+    $publisher->create( channel => $chan, name => $pub, idx => 0);
+
+}
+
+sub remove_publisher {
+    my ($self, $chan, $pub) = @_;
+    my $publisher = _get_publisher($chan => $pub);
+    $publisher->delete();
+
+}
+
+sub get_index {
+    my ($self, $chan, $pub) = @_;
+    my $publisher =  _get_publisher($chan => $pub);
+    if ($publisher->id) {
+            return $publisher->idx
+        }
+    
+}
+
+sub set_index {
+    my ($self, $chan, $pub, $idx) = @_;
+    return _get_publisher($chan => $pub)->set_idx($idx);
+}
+
+sub _get_publisher {
+    my $chan = shift;
+    my $pub = shift;
+    my $publisher = MessageBus::Cache::JiftyDBI::Stash::Publisher->new(handle => $STASH->handle);
+    $publisher->load_by_cols( channel => $chan, name => $pub);
+    return $publisher;
+}
+1;

Added: MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash.pm	Tue Oct 24 22:10:08 2006
@@ -0,0 +1,68 @@
+package MessageBus::Cache::JiftyDBI::Stash;
+use warnings;
+use strict;
+
+
+use Jifty::DBI::Handle;
+use Jifty::DBI::SchemaGenerator;
+         use File::Temp qw/ tempfile tempdir /;
+use MessageBus::Cache::JiftyDBI::Stash::Item;
+use MessageBus::Cache::JiftyDBI::Stash::Publisher;
+my $FILE=       $ENV{'HOME'}.  '/.messagebus.sqlite';
+
+sub new {
+    my $class = shift;
+    my $self  = {};
+    bless $self, $class;
+
+    my %args = (
+        db_init => 0,
+        db_config => undef,
+        @_
+    );
+    unless ( $args{'db_config'} ) {
+        my $filename;
+        ( undef, $filename ) = tempfile();
+
+        $args{'db_config'} = { driver => 'SQLite', database => $filename };
+    }
+
+    $self->_connect( %{$args{'db_config'}} );
+    if ( $args{'db_init'} ) {
+        $self->_generate_db();
+    }
+    $self;
+}
+
+sub handle {
+    my $self = shift;
+    $self->{'handle'} = shift if (@_);
+    return $self->{'handle'};
+}
+
+sub _generate_db {
+    my $self = shift;
+    my $gen = Jifty::DBI::SchemaGenerator->new( $self->handle );
+    $gen->add_model( MessageBus::Cache::JiftyDBI::Stash::Item->new( handle => $self->handle ) );
+    $gen->add_model( MessageBus::Cache::JiftyDBI::Stash::Publisher->new( handle => $self->handle ) );
+    my @statements = $gen->create_table_sql_statements;
+    $self->handle->begin_transaction;
+    for my $statement (@statements) {
+        my $ret = $self->handle->simple_query($statement);
+    }
+    $self->handle->commit;
+
+}
+
+
+sub _connect {
+    my $self = shift;
+
+
+    my $handle = Jifty::DBI::Handle->new();
+    $handle->connect(@_);
+    $self->handle($handle);
+}
+
+
+1;

Added: MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/Item.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/Item.pm	Tue Oct 24 22:10:08 2006
@@ -0,0 +1,23 @@
+package MessageBus::Cache::JiftyDBI::Stash::Item;
+
+use warnings;
+use strict;
+
+use Jifty::DBI::Schema;
+use Jifty::DBI::Record schema {
+    column key    => type is 'text';
+    column val    => type is 'blob', filters are 'Jifty::DBI::Filter::Storable';
+    column expiry => type is 'int';
+};
+
+package MessageBus::Cache::JiftyDBI::Stash::ItemCollection;
+use base qw/Jifty::DBI::Collection/;
+
+sub table {
+    my $self = shift;
+    my $tab = $self->new_item->table();
+    return $tab;
+}
+
+
+1;

Added: MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/Publisher.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/JiftyDBI/Stash/Publisher.pm	Tue Oct 24 22:10:08 2006
@@ -0,0 +1,23 @@
+package MessageBus::Cache::JiftyDBI::Stash::Publisher;
+
+use warnings;
+use strict;
+
+use Jifty::DBI::Schema;
+use Jifty::DBI::Record schema {
+    column channel    => type is 'text';
+    column name    => type is 'text';
+    column idx => type is 'int';
+};
+
+
+package MessageBus::Cache::JiftyDBI::Stash::PublisherCollection;
+use base qw/Jifty::DBI::Collection/;
+
+sub table {
+    my $self = shift;
+    my $tab = $self->new_item->table();
+    return $tab;
+}
+
+1;

Modified: MessageBus/lib/MessageBus/Cache/Memcached.pm
==============================================================================
--- MessageBus/lib/MessageBus/Cache/Memcached.pm	(original)
+++ MessageBus/lib/MessageBus/Cache/Memcached.pm	Tue Oct 24 22:10:08 2006
@@ -32,18 +32,35 @@
     $$self->get("$chan#") || {};
 }
 
+sub lock {
+    my ($self, $chan) = @_;
+    for my $i (1..10) {
+        return if $$self->add("$chan#lock" => 1);
+        sleep 1;
+    }
+}
+
+sub unlock {
+    my ($self, $chan) = @_;
+    $$self->delete("$chan#lock");
+}
+
 sub add_publisher {
     my ($self, $chan, $pub) = @_;
+    $self->lock($chan);
     my $pubs = $$self->get("$chan#") || {};
     $pubs->{$pub} = 0;
     $$self->set("$chan#", $pubs);
+    $self->unlock($chan);
 }
 
 sub remove_publisher {
     my ($self, $chan, $pub) = @_;
+    $self->lock($chan);
     my $pubs = $$self->get("$chan#") || {};
     delete $pubs->{$pub};
     $$self->set("$chan#", $pubs);
+    $self->unlock($chan);
 }
 
 sub get_index {

Modified: MessageBus/lib/MessageBus/Cache/PlainHash.pm
==============================================================================
--- MessageBus/lib/MessageBus/Cache/PlainHash.pm	(original)
+++ MessageBus/lib/MessageBus/Cache/PlainHash.pm	Tue Oct 24 22:10:08 2006
@@ -17,8 +17,8 @@
 }
 
 sub publisher_indices {
-    my ($self, $chan) = @_;
-    +{%{$cache{$chan}||{}}};
+    my ( $self, $chan ) = @_;
+    +{ %{ $cache{$chan} || {} } };
 }
 
 sub add_publisher {

Modified: MessageBus/t/basic.t
==============================================================================
--- MessageBus/t/basic.t	(original)
+++ MessageBus/t/basic.t	Tue Oct 24 22:10:08 2006
@@ -1,17 +1,28 @@
 use strict;
-use Test::More tests => 18;
+use warnings;
+use Test::More;
 use MessageBus;
 use IO::Socket::INET;
 
-my @backends = qw(PlainHash DBM_Deep Memcached);
+my @backends = qw(PlainHash);
+
+unshift @backends, 'DBM_Deep' if eval { require DBM::Deep };
+unshift @backends, 'JiftyDBI' if eval { require Jifty::DBI };
+unshift @backends, 'Memcached' if eval { require Cache::Memcached };
+
+plan tests => 6 * scalar @backends;
 
 SKIP: for my $backend (@backends) {
+    my %init_args = ();
+    diag('Testing backend '.$backend);
     if ($backend eq 'Memcached') {
         my $sock = IO::Socket::INET->new('127.0.0.1:11211')
             or skip("Memcached not started", 6);
+    } elsif ($backend eq 'JiftyDBI') {
+        $init_args{db_init} = 1;
     }
 
-    my $bus = MessageBus->new($backend);
+    my $bus = MessageBus->new($backend, %init_args);
 
     my @sub; $sub[0] = $bus->subscribe;
 
@@ -24,12 +35,12 @@
 
     $sub[1] = $bus->subscribe;
 
-    $pub->msg('bar');
+    $pub->msg(['bar', 'bar']);
     $pub->msg('baz');
 
-    is_deeply([$sub[0]->get], [qw< foo bar baz >], 'get worked');
+    is_deeply([$sub[0]->get], ['foo', ['bar', 'bar'], 'baz'], 'get worked');
     is_deeply([$sub[0]->get], [], 'get emptied the cache');
 
-    is_deeply([map {$_->[1]} @{$sub[1]->get_all->{''}}], [qw< bar baz >], 'get_all worked');
+    is_deeply([map {$_->[1]} @{$sub[1]->get_all->{''}}], [['bar', 'bar'], 'baz'], 'get_all worked');
     is_deeply([map {$_->[1]} @{$sub[1]->get_all->{''}}], [], 'get_all emptied the cache');
 }


More information about the Rt-commit mailing list