[Rt-commit] r6260 - in MessageBus: lib lib/MessageBus
lib/MessageBus/Cache t
audreyt at bestpractical.com
audreyt at bestpractical.com
Tue Oct 24 19:53:40 EDT 2006
Author: audreyt
Date: Tue Oct 24 19:53:38 2006
New Revision: 6260
Added:
MessageBus/Makefile.PL
MessageBus/lib/
MessageBus/lib/MessageBus/
MessageBus/lib/MessageBus.pm
MessageBus/lib/MessageBus/Cache/
MessageBus/lib/MessageBus/Cache/DBM_Deep.pm
MessageBus/lib/MessageBus/Cache/Memcached.pm
MessageBus/lib/MessageBus/Cache/PlainHash.pm
MessageBus/lib/MessageBus/Cacheable.pm
MessageBus/lib/MessageBus/Pub.pm
MessageBus/lib/MessageBus/Sub.pm
MessageBus/t/
MessageBus/t/basic.t
Log:
* MessageBus 0.01
Added: MessageBus/Makefile.PL
==============================================================================
--- (empty file)
+++ MessageBus/Makefile.PL Tue Oct 24 19:53:38 2006
@@ -0,0 +1,12 @@
+use strict;
+use inc::Module::Install;
+
+name 'MessageBus';
+license 'MIT';
+all_from 'lib/MessageBus.pm';
+
+requires 'DBM::Deep';
+requires 'Data::UUID';
+requires 'Class::InsideOut';
+
+sign; WriteAll;
Added: MessageBus/lib/MessageBus.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus.pm Tue Oct 24 19:53:38 2006
@@ -0,0 +1,113 @@
+package MessageBus;
+$MessageBus::VERSION = '0.01';
+
+use 5.005;
+use strict;
+use Class::InsideOut qw( public private register id );
+use MessageBus::Pub;
+use MessageBus::Sub;
+
+private cache => my %cache;
+
+sub new {
+ my $id = id( my $self = register( bless \(my $s), shift ) );
+
+ my $backend = shift || 'PlainHash';
+
+ local $@;
+ eval { require "MessageBus/Cache/$_.pm" }
+ or die "Cannot find backend module: MessageBus::Cache::$_";
+
+ $cache{$id} = "MessageBus::Cache::$_"->new(@_);
+ return $self;
+}
+
+sub publish {
+ my $id = id(my $self = shift);
+ MessageBus::Pub->new($cache{$id}, @_ ? @_ : '');
+}
+
+sub subscribe {
+ my $id = id(my $self = shift);
+ MessageBus::Sub->new($cache{$id}, @_ ? @_ : '');
+}
+
+1;
+
+__END__
+
+=head1 NAME
+
+MessageBus - Lightweight publish/subscribe messaging system
+
+=head1 SYNOPSIS
+
+ # A new message bus with the DBM::Deep backend
+ # (Other possible backends include Memcached and PlainHash)
+ my $bus = MessageBus->new(DBM_Deep => '/tmp/bus.db');
+
+ # A channel is any arbitrary string
+ my $channel = '#perl6';
+
+ # Register a new publisher (you can publish to multiple channels)
+ my $pub = $bus->publish("#perl6", "#moose");
+
+ # Publish a message (may be a complex object) to those channels
+ $pub->msg("This is a message");
+
+ # Register a new subscriber (you can subscribe to multiple channels)
+ my $sub = $bus->subscribe("#moose");
+
+ # Publish an object to channels
+ $pub->msg("This is another message");
+
+ # Simple get: Returns the messages sent since the previous get,
+ # but only for the first channel.
+ my @msgs = $sub->get;
+
+ # Simple get, with an explicit channel key (must be among the ones
+ # it initially subscribed to)
+ my @msgs = $sub->get("#moose");
+
+ # Complex get: Returns a hash reference from channels to array
+ # references of [timestamp, message].
+ my $hash_ref = $sub->get_all;
+
+=head1 DESCRIPTION
+
+This module provides a simple message bus for publishing messages and
+subscribing to them.
+
+Currently it offers three backends: C<DBM_Deep> for on-disk storage,
+C<Memcached> for possibly multi-host storage, and C<PlainHash> for
+single-process storage.
+
+Please see the tests in F<t/> for this distribution, as well as L</SYNOPSIS>
+above, for some usage examples; detailed documentation is not yet available.
+
+=head1 AUTHORS
+
+Audrey Tang E<lt>cpan at audreyt.orgE<gt>
+
+=head1 COPYRIGHT (The "MIT" License)
+
+Copyright 2002-2006 by Audrey Tang <cpan at audreyt.org>.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is fur-
+nished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FIT-
+NESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE X
+CONSORTIUM BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+=cut
Added: MessageBus/lib/MessageBus/Cache/DBM_Deep.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/DBM_Deep.pm Tue Oct 24 19:53:38 2006
@@ -0,0 +1,60 @@
+package MessageBus::Cache::DBM_Deep;
+use strict;
+use base 'MessageBus::Cacheable';
+use DBM::Deep;
+use File::Temp qw/ tempfile /;
+
+sub new {
+ my $class = shift;
+ my $file = shift;
+ my $mem = DBM::Deep->new($file || $class->default_config);
+ bless(\$mem, $class);
+}
+
+sub default_config {
+ my (undef, $filename) = tempfile(UNLINK => 1);
+ return $filename;
+}
+
+sub fetch {
+ my $self = shift;
+ map { $$self->get($_) } @_;
+}
+
+sub store {
+ my ($self, $key, $val, $time, $expiry) = @_;
+ $$self->put($key, [$time, $val]);
+}
+
+sub publisher_indices {
+ my ($self, $chan) = @_;
+ return { %{ $$self->get("$chan#") || {} } };
+}
+
+sub add_publisher {
+ my ($self, $chan, $pub) = @_;
+ my $pubs = $$self->get($chan) || {};
+ $pubs->{$pub} = 0;
+ $$self->put("$chan#", $pubs);
+}
+
+sub remove_publisher {
+ my ($self, $chan, $pub) = @_;
+ my $pubs = $$self->get($chan) || {};
+ delete $pubs->{$pub};
+ $$self->put("$chan#", $pubs);
+}
+
+sub get_index {
+ my ($self, $chan, $pub) = @_;
+ ($$self->get("$chan#") || {})->{$pub};
+}
+
+sub set_index {
+ my ($self, $chan, $pub, $idx) = @_;
+ my $pubs = $$self->get($chan) || {};
+ $pubs->{$pub} = $idx;
+ $$self->put("$chan#", $pubs);
+}
+
+1;
Added: MessageBus/lib/MessageBus/Cache/Memcached.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/Memcached.pm Tue Oct 24 19:53:38 2006
@@ -0,0 +1,61 @@
+package MessageBus::Cache::Memcached;
+use strict;
+use base 'MessageBus::Cacheable';
+use Cache::Memcached;
+
+sub new {
+ my $class = shift;
+ my $config = shift || $class->default_config;
+ my $mem = Cache::Memcached->new($config);
+ bless(\$mem, $class);
+}
+
+sub default_config {
+ return {
+ servers => ['127.0.0.1:11211'],
+ debug => 0
+ };
+}
+
+sub fetch {
+ my $self = shift;
+ values(%{$$self->get_multi(@_)});
+}
+
+sub store {
+ my ($self, $key, $val, $time, $expiry) = @_;
+ $$self->set($key, [$time, $val], $expiry);
+}
+
+sub publisher_indices {
+ my ($self, $chan) = @_;
+ $$self->get("$chan#") || {};
+}
+
+sub add_publisher {
+ my ($self, $chan, $pub) = @_;
+ my $pubs = $$self->get($chan) || {};
+ $pubs->{$pub} = 0;
+ $$self->set("$chan#", $pubs);
+}
+
+sub remove_publisher {
+ my ($self, $chan, $pub) = @_;
+ my $pubs = $$self->get($chan) || {};
+ delete $pubs->{$pub};
+ $$self->set("$chan#", $pubs);
+}
+
+sub get_index {
+ my ($self, $chan, $pub) = @_;
+ ($$self->get("$chan#") || {})->{$pub};
+}
+
+sub set_index {
+ my ($self, $chan, $pub, $idx) = @_;
+ my $pubs = $$self->get($chan) || {};
+ $pubs->{$pub} = $idx;
+ $$self->set("$chan#", $pubs);
+}
+
+1;
Added: MessageBus/lib/MessageBus/Cache/PlainHash.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cache/PlainHash.pm Tue Oct 24 19:53:38 2006
@@ -0,0 +1,44 @@
+package MessageBus::Cache::PlainHash;
+use strict;
+use base 'MessageBus::Cacheable';
+
+my %cache;
+
+use constant new => __PACKAGE__;
+
+sub fetch {
+ my $self = shift;
+ @cache{@_};
+}
+
+sub store {
+ my ($self, $key, $val, $time, $expiry) = @_;
+ $cache{$key} = [$time => $val];
+}
+
+sub publisher_indices {
+ my ($self, $chan) = @_;
+ +{%{$cache{$chan}||{}}};
+}
+
+sub add_publisher {
+ my ($self, $chan, $pub) = @_;
+ $cache{$chan}{$pub} = 0;
+}
+
+sub remove_publisher {
+ my ($self, $chan, $pub) = @_;
+ delete $cache{$chan}{$pub};
+}
+
+sub get_index {
+ my ($self, $chan, $pub) = @_;
+ $cache{$chan}{$pub};
+}
+
+sub set_index {
+ my ($self, $chan, $pub, $idx) = @_;
+ $cache{$chan}{$pub} = $idx;
+}
+
+1;
Added: MessageBus/lib/MessageBus/Cacheable.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Cacheable.pm Tue Oct 24 19:53:38 2006
@@ -0,0 +1,38 @@
+package MessageBus::Cacheable;
+use strict;
+use Time::HiRes 'time';
+
+#method fetch (Str *@keys --> List of Pair) { ... }
+#method store (Str $key, Str $val, Num $time, Num $expiry) { ... }
+
+#method add_publisher (Str $chan, Str $pub) { ... }
+#method remove_publisher (Str $chan, Str $pub) { ... }
+
+#method get_index (Str $chan, Str $pub --> Int) { ... }
+#method set_index (Str $chan, Str $pub, Int $index) { ... }
+
+#method publisher_indices (Str $chan --> Hash of Int) { ... }
+
+sub get {
+ my ($self, $chan, $orig, $curr) = @_;
+
+ no warnings 'uninitialized';
+ sort { $a->[0] <=> $b->[0] } $self->fetch(
+ map {
+ my $pub = $_;
+ my $index = $curr->{$pub};
+ map {
+ "$chan-$pub-$_"
+ } (($orig->{$pub}+1) .. $index);
+ } keys(%$curr)
+ );
+}
+
+sub put {
+ my ($self, $chan, $pub, $msg, $expiry) = @_;
+ my $index = 1 + $self->get_index($chan, $pub);
+ $self->store("$chan-$pub-$index", $msg, time, $expiry);
+ $self->set_index($chan, $pub, $index);
+}
+
+1;
Added: MessageBus/lib/MessageBus/Pub.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Pub.pm Tue Oct 24 19:53:38 2006
@@ -0,0 +1,32 @@
+package MessageBus::Pub;
+use strict;
+use Data::UUID;
+use Class::InsideOut qw( public private register id );
+
+public chan => my %chan;
+private uuid => my %uuid;
+private cache => my %cache;
+
+sub new {
+ my ($class, $cache, @chan) = @_;
+ my $id = id( my $self = register( bless \(my $s), shift ) );
+ $chan{$id} = \@chan;
+ $uuid{$id} = Data::UUID->new->create_b64;
+ $cache{$id} = $cache;
+ $cache->add_publisher($_, $uuid{$id}) for @chan;
+ return $self;
+}
+
+sub msg {
+ my $id = id(my $self = shift);
+ my $msg = shift;
+ $cache{$id}->put($_, $uuid{$id}, $msg) for @{$chan{$id}};
+}
+
+no warnings 'redefine';
+sub DESTROY {
+ my $id = id(my $self = shift);
+ $cache{$id}->remove_publisher($_, $uuid{$id}) for @{$chan{$id}};
+}
+
+1;
Added: MessageBus/lib/MessageBus/Sub.pm
==============================================================================
--- (empty file)
+++ MessageBus/lib/MessageBus/Sub.pm Tue Oct 24 19:53:38 2006
@@ -0,0 +1,41 @@
+package MessageBus::Sub;
+
+use strict;
+use Class::InsideOut qw( public private register id );
+
+public chan => my %chan;
+private pubs => my %pubs;
+private cache => my %cache;
+
+sub new {
+ my ($class, $cache, @chan) = @_;
+ my $id = id( my $self = register( bless \(my $s), shift ) );
+ $pubs{$id} = { map { $_ => $cache->publisher_indices($_); } @chan };
+ $cache{$id} = $cache;
+ $chan{$id} = \@chan;
+ return $self;
+}
+
+sub get_all {
+ my $id = id(my $self = shift);
+ return {
+ map {
+ my $orig = $pubs{$id}{$_};
+ $pubs{$id}{$_} = $cache{$id}->publisher_indices($_);
+ $_ => [$cache{$id}->get($_, $orig, $pubs{$id}{$_})];
+ } @{$chan{$id}}
+ };
+}
+
+sub get {
+ my $id = id(my $self = shift);
+ my $chan = @_ ? shift : $chan{$id}[0];
+
+ my $orig = $pubs{$id}{$chan};
+ $pubs{$id}{$chan} = $cache{$id}->publisher_indices($chan);
+ wantarray
+ ? map {$_->[1]} $cache{$id}->get($chan, $orig, $pubs{$id}{$chan})
+ : [map {$_->[1]} $cache{$id}->get($chan, $orig, $pubs{$id}{$chan})];
+}
+
+1;
Added: MessageBus/t/basic.t
==============================================================================
--- (empty file)
+++ MessageBus/t/basic.t Tue Oct 24 19:53:38 2006
@@ -0,0 +1,35 @@
+use strict;
+use Test::More tests => 18;
+use MessageBus;
+use IO::Socket::INET;
+
+my @backends = qw(PlainHash DBM_Deep Memcached);
+
+SKIP: for (@backends) {
+ if ($_ eq 'Memcached') {
+ my $sock = IO::Socket::INET->new('127.0.0.1:11211')
+ or skip("Memcached not started", 6);
+ }
+
+ my $bus = MessageBus->new($_);
+
+ my @sub; $sub[0] = $bus->subscribe;
+
+ is_deeply([map {$_->[1]} @{$sub[0]->get_all->{''}}], [], 'get_all worked when there is no pubs');
+ is_deeply([$sub[0]->get], [], 'get_all worked when there is no pubs');
+
+ my $pub = $bus->publish;
+
+ $pub->msg('foo');
+
+ $sub[1] = $bus->subscribe;
+
+ $pub->msg('bar');
+ $pub->msg('baz');
+
+ is_deeply([$sub[0]->get], [qw< foo bar baz >], 'get worked');
+ is_deeply([$sub[0]->get], [], 'get emptied the cache');
+
+ is_deeply([map {$_->[1]} @{$sub[1]->get_all->{''}}], [qw< bar baz >], 'get_all worked');
+ is_deeply([map {$_->[1]} @{$sub[1]->get_all->{''}}], [], 'get_all emptied the cache');
+}
More information about the Rt-commit
mailing list