[Bps-public-commit] r17053 - Prophet/trunk/lib/Prophet/Replica
jesse at bestpractical.com
jesse at bestpractical.com
Sat Nov 29 16:35:23 EST 2008
Author: jesse
Date: Sat Nov 29 16:35:22 2008
New Revision: 17053
Modified:
Prophet/trunk/lib/Prophet/Replica/prophet.pm
Log:
* perltidy prophet.pm
Modified: Prophet/trunk/lib/Prophet/Replica/prophet.pm
==============================================================================
--- Prophet/trunk/lib/Prophet/Replica/prophet.pm (original)
+++ Prophet/trunk/lib/Prophet/Replica/prophet.pm Sat Nov 29 16:35:22 2008
@@ -3,9 +3,9 @@
extends 'Prophet::Replica';
use Params::Validate qw(:all);
use LWP::Simple ();
-use File::Spec ();
+use File::Spec ();
use File::Path;
-use Cwd ();
+use Cwd ();
use Digest::SHA1 qw(sha1_hex);
use File::Find::Rule;
use Data::UUID;
@@ -13,16 +13,12 @@
use JSON;
use POSIX qw();
-
has '+db_uuid' => (
lazy => 1,
default => sub { shift->_read_file('database-uuid') },
);
-has _uuid => (
- is => 'rw',
-);
-
+has _uuid => ( is => 'rw', );
has replica_version => (
is => 'ro',
@@ -50,9 +46,7 @@
},
);
-has current_edit => (
- is => 'rw',
-);
+has current_edit => ( is => 'rw', );
has current_edit_records => (
metaclass => 'Collection::Array',
@@ -61,28 +55,30 @@
default => sub { [] },
);
-
has '+resolution_db_handle' => (
isa => 'Prophet::Replica | Undef',
lazy => 1,
default => sub {
my $self = shift;
return if $self->is_resdb || $self->is_state_handle;
- return Prophet::Replica->new({
- url => "prophet:" . $self->url . '/resolutions',
- app_handle => $self->app_handle,
- is_resdb => 1,
- })
+ return Prophet::Replica->new(
+ { url => "prophet:" . $self->url . '/resolutions',
+ app_handle => $self->app_handle,
+ is_resdb => 1,
+ }
+ );
},
);
-use constant scheme => 'prophet';
-use constant cas_root => 'cas';
-use constant record_cas_dir => File::Spec->catdir( __PACKAGE__->cas_root => 'records' );
-use constant changeset_cas_dir => File::Spec->catdir( __PACKAGE__->cas_root => 'changesets' );
-use constant record_dir => 'records';
-use constant userdata_dir => 'userdata';
-use constant changeset_index => 'changesets.idx';
+use constant scheme => 'prophet';
+use constant cas_root => 'cas';
+use constant record_cas_dir =>
+ File::Spec->catdir( __PACKAGE__->cas_root => 'records' );
+use constant changeset_cas_dir =>
+ File::Spec->catdir( __PACKAGE__->cas_root => 'changesets' );
+use constant record_dir => 'records';
+use constant userdata_dir => 'userdata';
+use constant changeset_index => 'changesets.idx';
=head1 Replica Format
@@ -212,9 +208,9 @@
sub BUILD {
my $self = shift;
my $args = shift;
- Carp::cluck() unless ($args->{app_handle});
- for ($self->{url} ) {
- s/^prophet://; # url-based constructor in ::replica should do better
+ Carp::cluck() unless ( $args->{app_handle} );
+ for ( $self->{url} ) {
+ s/^prophet://; # url-based constructor in ::replica should do better
s{/$}{};
}
@@ -231,7 +227,7 @@
sub replica_exists {
my $self = shift;
- return $self->replica_version ? 1 :0;
+ return $self->replica_version ? 1 : 0;
}
=head2 set_replica_version
@@ -255,15 +251,14 @@
}
sub can_initialize {
- my $self = shift;
- if ( $self->fs_root_parent && -w $self->fs_root_parent ) {
+ my $self = shift;
+ if ( $self->fs_root_parent && -w $self->fs_root_parent ) {
return 1;
}
return 0;
}
-
use constant can_read_records => 1;
use constant can_read_changesets => 1;
sub can_write_changesets { return ( shift->fs_root ? 1 : 0 ) }
@@ -271,10 +266,12 @@
sub initialize {
my $self = shift;
- my %args = validate(@_, {
- db_uuid => 0,
- resdb_uuid => 0,
- });
+ my %args = validate(
+ @_,
+ { db_uuid => 0,
+ resdb_uuid => 0,
+ }
+ );
if ( !$self->fs_root_parent ) {
@@ -293,17 +290,14 @@
return if $self->replica_exists;
- for (
+ for (
$self->record_dir, $self->cas_root,
$self->record_cas_dir, $self->changeset_cas_dir,
$self->userdata_dir
- ) {
- mkpath([File::Spec->catdir($self->fs_root => $_)]);
- }
-
-
-
-;
+ )
+ {
+ mkpath( [ File::Spec->catdir( $self->fs_root => $_ ) ] );
+ }
$self->set_db_uuid( $args{'db_uuid'} || Data::UUID->new->create_str );
$self->set_latest_sequence_no("0");
@@ -311,7 +305,7 @@
$self->set_replica_version(1);
- $self->resolution_db_handle->initialize(db_uuid => $args{resdb_uuid})
+ $self->resolution_db_handle->initialize( db_uuid => $args{resdb_uuid} )
if !$self->is_resdb;
$self->after_initialize->($self);
@@ -374,8 +368,8 @@
=cut
sub _write_record {
- my $self = shift;
- my %args = validate( @_, { record => { isa => 'Prophet::Record' }, } );
+ my $self = shift;
+ my %args = validate( @_, { record => { isa => 'Prophet::Record' }, } );
my $record = $args{'record'};
$self->_write_serialized_record(
@@ -398,17 +392,19 @@
cas_dir => $self->record_cas_dir
);
- my $record = {uuid => $args{uuid},
+ my $record = {
+ uuid => $args{uuid},
type => $args{type},
- cas_key => $cas_key};
+ cas_key => $cas_key
+ };
$self->_prepare_record_index_update(
- uuid => $args{uuid},
+ uuid => $args{uuid},
type => $args{type},
- cas_key => $cas_key);
+ cas_key => $cas_key
+ );
}
-
sub _prepare_record_index_update {
my $self = shift;
my %record = (@_);
@@ -418,6 +414,7 @@
push @{ $self->current_edit_records }, \%record;
} else {
+
# If we're not inside an edit, we're likely exporting the replica
# TODO: the replica exporter code should probably be retooled
$self->_write_record_index_entry(%record);
@@ -428,23 +425,25 @@
use constant RECORD_INDEX_SIZE => ( 4 + 20 );
sub _write_record_index_entry {
- my $self = shift;
- my %args = validate( @_, { type => 1, uuid => 1, cas_key => 1, changeset_id => 0 } );
+ my $self = shift;
+ my %args = validate( @_,
+ { type => 1, uuid => 1, cas_key => 1, changeset_id => 0 } );
my $idx_filename = $self->_record_index_filename(
uuid => $args{uuid},
type => $args{type}
);
my $index_path = File::Spec->catfile( $self->fs_root, $idx_filename );
- my (undef,$parent, $filename) = File::Spec->splitpath($index_path);
- mkpath([$parent]);
+ my ( undef, $parent, $filename ) = File::Spec->splitpath($index_path);
+ mkpath( [$parent] );
- open( my $record_index, ">>" . $index_path);
+ open( my $record_index, ">>" . $index_path );
# XXX TODO: skip if the index already has this version of the record;
# XXX TODO FETCH THAT
my $record_last_changed_changeset = $args{'changeset_id'} || 0;
- my $index_row = pack( 'NH40', $record_last_changed_changeset, $args{cas_key} );
+ my $index_row
+ = pack( 'NH40', $record_last_changed_changeset, $args{cas_key} );
print $record_index $index_row || die $!;
close $record_index;
}
@@ -458,11 +457,10 @@
type => $args{type},
uuid => $args{uuid}
);
- return @{$entries[-1] || []};
+ return @{ $entries[-1] || [] };
}
-
sub _read_record_index {
my $self = shift;
my %args = validate( @_, { type => 1, uuid => 1 } );
@@ -479,12 +477,8 @@
my $count = length($index) / RECORD_INDEX_SIZE;
my @entries;
for my $offset ( 0 .. ( $count - 1 ) ) {
- my ( $seq, $key ) = unpack(
- 'NH40',
- substr(
- $index, ( $offset ) * RECORD_INDEX_SIZE,
- RECORD_INDEX_SIZE
- )
+ my ( $seq, $key ) = unpack( 'NH40',
+ substr( $index, ($offset) * RECORD_INDEX_SIZE, RECORD_INDEX_SIZE )
);
push @entries, [ $seq => $key ];
}
@@ -520,31 +514,31 @@
my %args = validate( @_, { uuid => 1, type => 1 } );
return File::Spec->catfile(
$self->_record_type_root( $args{'type'} ),
- $self->_hashed_dir_name($args{uuid})
+ $self->_hashed_dir_name( $args{uuid} )
);
}
-
sub _hashed_dir_name {
my $self = shift;
my $hash = shift;
- return (substr( $hash, 0, 1 ), substr( $hash, 1, 1 ), $hash);
+ return ( substr( $hash, 0, 1 ), substr( $hash, 1, 1 ), $hash );
}
-
sub _record_cas_filename {
- my $self = shift;
- my %args = validate( @_, { type => 1, uuid => 1 } );
+ my $self = shift;
+ my %args = validate( @_, { type => 1, uuid => 1 } );
- my ($seq,$key) = $self->_read_record_index_entry( type => $args{'type'}, uuid => $args{'uuid'});
+ my ( $seq, $key ) = $self->_read_record_index_entry(
+ type => $args{'type'},
+ uuid => $args{'uuid'}
+ );
+
+ return undef unless ( $key and ( $key ne '0' x 40 ) );
- return undef unless ($key and ($key ne '0'x40));
# XXX: deserialize the changeset content from the cas with $key
- my $casfile = File::Spec->catfile(
- $self->record_cas_dir,
- $self->_hashed_dir_name($key)
- );
+ my $casfile = File::Spec->catfile( $self->record_cas_dir,
+ $self->_hashed_dir_name($key) );
return $casfile;
}
@@ -568,7 +562,7 @@
# XXX TODO: we should not be calculating the changeset's sha1 with the 'replica_uuid' and 'sequence_no' inside it. that makes every replica have a different hash for what should be the samechangeset.
# These ttwo things should never actually get stored
- my $seqno = delete $hash_changeset->{'sequence_no'};
+ my $seqno = delete $hash_changeset->{'sequence_no'};
my $uuid = delete $hash_changeset->{'replica_uuid'};
my $cas_key = $self->_write_to_cas(
@@ -599,39 +593,36 @@
use constant CHG_RECORD_SIZE => ( 4 + 16 + 4 + 20 );
-
sub _get_changeset_index_entry {
my $self = shift;
- my %args = validate(@_, { sequence_no => 1, index_file => 1});
+ my %args = validate( @_, { sequence_no => 1, index_file => 1 } );
my $chgidx = $args{index_file};
- my $rev = $args{'sequence_no'};
- my $index_record = substr( $$chgidx, ( $rev - 1 ) * CHG_RECORD_SIZE,
- CHG_RECORD_SIZE );
- my ( $seq, $orig_uuid, $orig_seq, $key )
- = unpack( 'Na16NH40', $index_record );
-
- $self->log(join(",", ( $seq, $orig_uuid, $orig_seq, $key )));
- $orig_uuid = Data::UUID->new->to_string($orig_uuid);
- $self->log( "REV: $rev - seq $seq - originally $orig_seq from "
- . substr( $orig_uuid, 0, 6 )
- . " data key $key" );
-
- # XXX: deserialize the changeset content from the cas with $key
- my $casfile = File::Spec->catfile( $self->changeset_cas_dir =>
- $self->_hashed_dir_name($key)
- );
+ my $rev = $args{'sequence_no'};
+ my $index_record
+ = substr( $$chgidx, ( $rev - 1 ) * CHG_RECORD_SIZE, CHG_RECORD_SIZE );
+ my ( $seq, $orig_uuid, $orig_seq, $key )
+ = unpack( 'Na16NH40', $index_record );
+
+ $self->log( join( ",", ( $seq, $orig_uuid, $orig_seq, $key ) ) );
+ $orig_uuid = Data::UUID->new->to_string($orig_uuid);
+ $self->log( "REV: $rev - seq $seq - originally $orig_seq from "
+ . substr( $orig_uuid, 0, 6 )
+ . " data key $key" );
- my $changeset = $self->_deserialize_changeset(
- content => $self->_read_file($casfile),
- original_source_uuid => $orig_uuid,
- original_sequence_no => $orig_seq,
- sequence_no => $seq
- );
+ # XXX: deserialize the changeset content from the cas with $key
+ my $casfile = File::Spec->catfile(
+ $self->changeset_cas_dir => $self->_hashed_dir_name($key) );
- return $changeset;
-}
+ my $changeset = $self->_deserialize_changeset(
+ content => $self->_read_file($casfile),
+ original_source_uuid => $orig_uuid,
+ original_sequence_no => $orig_seq,
+ sequence_no => $seq
+ );
+ return $changeset;
+}
sub traverse_changesets {
my $self = shift;
@@ -659,9 +650,9 @@
}
sub _read_changeset_index {
- my $self =shift;
+ my $self = shift;
$self->log("Reading changeset index file");
- my $chgidx = $self->_read_file( $self->changeset_index );
+ my $chgidx = $self->_read_file( $self->changeset_index );
return \$chgidx;
}
@@ -678,21 +669,27 @@
my $self = shift;
my %args = validate( @_, { uuid => 1, type => 1 } );
- my @record_index = $self->_read_record_index( type => $args{'type'}, uuid => $args{'uuid'});
+ my @record_index = $self->_read_record_index(
+ type => $args{'type'},
+ uuid => $args{'uuid'}
+ );
my $changeset_index = $self->_read_changeset_index();
my @changesets;
for my $item (@record_index) {
my $sequence = $item->[0];
- push @changesets, $self->_get_changeset_index_entry( sequence_no => $sequence, index_file => $changeset_index);
+ push @changesets,
+ $self->_get_changeset_index_entry(
+ sequence_no => $sequence,
+ index_file => $changeset_index
+ );
}
return @changesets;
}
-
sub _deserialize_changeset {
my $self = shift;
my %args = validate(
@@ -705,8 +702,8 @@
);
require Prophet::ChangeSet;
- my $content_struct = from_json( $args{content} , { utf8 => 1 });
- my $changeset = Prophet::ChangeSet->new_from_hashref($content_struct);
+ my $content_struct = from_json( $args{content}, { utf8 => 1 } );
+ my $changeset = Prophet::ChangeSet->new_from_hashref($content_struct);
$changeset->source_uuid( $self->uuid );
$changeset->sequence_no( $args{'sequence_no'} );
@@ -718,23 +715,26 @@
sub _get_changeset_index_handle {
my $self = shift;
- open( my $cs_file, ">>" . File::Spec->catfile( $self->fs_root => $self->changeset_index ) )
- || die $!;
+ open(
+ my $cs_file,
+ ">>" . File::Spec->catfile( $self->fs_root => $self->changeset_index )
+ ) || die $!;
return $cs_file;
}
sub _write_to_cas {
my $self = shift;
- my %args = validate( @_,
- { content_ref => 0, cas_dir => 1, data => 0 } );
+ my %args = validate( @_, { content_ref => 0, cas_dir => 1, data => 0 } );
my $content;
if ( $args{'content_ref'} ) {
$content = ${ $args{'content_ref'} };
} elsif ( $args{'data'} ) {
- $content = to_json($args{'data'}, { canonical => 1, pretty=> 0, utf8=>1} );
+ $content = to_json( $args{'data'},
+ { canonical => 1, pretty => 0, utf8 => 1 } );
}
- my $fingerprint = sha1_hex($content);
- my $content_filename = File::Spec->catfile( $args{'cas_dir'} => $self->_hashed_dir_name($fingerprint));
+ my $fingerprint = sha1_hex($content);
+ my $content_filename = File::Spec->catfile(
+ $args{'cas_dir'} => $self->_hashed_dir_name($fingerprint) );
$self->_write_file( path => $content_filename, content => $content );
return $fingerprint;
@@ -745,13 +745,15 @@
my %args = validate( @_, { path => 1, content => 1 } );
my $file = File::Spec->catfile( $self->fs_root => $args{'path'} );
- my (undef, $parent, $filename) = File::Spec->splitpath($file);
+ my ( undef, $parent, $filename ) = File::Spec->splitpath($file);
unless ( -d $parent ) {
- eval { mkpath([$parent])} ;
- if (my $msg = $@) { die "Failed to create directory " . $parent." - $msg";}
+ eval { mkpath( [$parent] ) };
+ if ( my $msg = $@ ) {
+ die "Failed to create directory " . $parent . " - $msg";
+ }
}
- open(my $fh, ">$file") || die $!;
+ open( my $fh, ">$file" ) || die $!;
print $fh scalar( $args{'content'} )
; # can't do "||" as we die if we print 0" || die "Could not write to " . $args{'path'} . " " . $!;
close $fh || die $!;
@@ -767,24 +769,29 @@
my $self = shift;
my ($file) = validate_pos( @_, 1 );
- if (! $self->fs_root ) {
+ if ( !$self->fs_root ) {
+
# HTTP Replica
return $self->_read_file($file) ? 1 : 0;
}
my $path = File::Spec->catfile( $self->fs_root, $file );
- if ( -f $path ) { return 1 }
- elsif ( -d $path ) { return 2 }
- else { return 0 }
+ if ( -f $path ) { return 1 }
+ elsif ( -d $path ) { return 2 }
+ else { return 0 }
}
sub read_file {
my $self = shift;
my ($file) = validate_pos( @_, 1 );
- if ($self->fs_root) {
+ if ( $self->fs_root ) {
+
# make sure we don't try to read files outside the replica
- my $qualified_file = Cwd::fast_abs_path(File::Spec->catfile( $self->fs_root => $file ));
- return undef if substr($qualified_file,0,length($self->fs_root)) ne $self->fs_root;
+ my $qualified_file = Cwd::fast_abs_path(
+ File::Spec->catfile( $self->fs_root => $file ) );
+ return undef
+ if substr( $qualified_file, 0, length( $self->fs_root ) ) ne
+ $self->fs_root;
}
return $self->_read_file($file);
}
@@ -795,21 +802,22 @@
if ( $self->fs_root ) {
return eval {
local $SIG{__DIE__} = 'DEFAULT';
- Prophet::Util->slurp (File::Spec->catfile( $self->fs_root => $file ))
+ Prophet::Util->slurp(
+ File::Spec->catfile( $self->fs_root => $file ) );
};
} else { # http replica
return LWP::Simple::get( $self->url . "/" . $file );
}
-
}
-
sub begin_edit {
my $self = shift;
- my %args = validate(@_, {
- source => 0, # the changeset that we're replaying, if applicable
- });
+ my %args = validate(
+ @_,
+ { source => 0, # the changeset that we're replaying, if applicable
+ }
+ );
my $source = $args{source};
@@ -817,13 +825,14 @@
my $created = $source && $source->created;
require Prophet::ChangeSet;
- my $changeset = Prophet::ChangeSet->new({
- source_uuid => $self->uuid,
- creator => $creator,
- $created ? (created => $created) : (),
- });
+ my $changeset = Prophet::ChangeSet->new(
+ { source_uuid => $self->uuid,
+ creator => $creator,
+ $created ? ( created => $created ) : (),
+ }
+ );
$self->current_edit($changeset);
- $self->current_edit_records([]);
+ $self->current_edit_records( [] );
}
@@ -845,8 +854,8 @@
$self->current_edit->original_source_uuid( $self->uuid )
unless ( $self->current_edit->original_source_uuid );
$self->current_edit->sequence_no($sequence);
- for my $record (@{$self->current_edit_records}) {
- $self->_write_record_index_entry(changeset_id => $sequence, %$record);
+ for my $record ( @{ $self->current_edit_records } ) {
+ $self->_write_record_index_entry( changeset_id => $sequence, %$record );
}
$self->_write_changeset_to_index( $self->current_edit );
}
@@ -908,7 +917,6 @@
my $inside_edit = $self->current_edit ? 1 : 0;
$self->begin_edit() unless ($inside_edit);
-
my $change = Prophet::Change->new(
{ record_type => $args{'type'},
record_uuid => $args{'uuid'},
@@ -916,8 +924,12 @@
}
);
$self->current_edit->add_change( change => $change );
-
- $self->_prepare_record_index_update( uuid => $args{uuid}, type => $args{type}, cas_key => '0'x40);
+
+ $self->_prepare_record_index_update(
+ uuid => $args{uuid},
+ type => $args{type},
+ cas_key => '0' x 40
+ );
$self->commit_edit() unless ($inside_edit);
return 1;
@@ -982,8 +994,8 @@
my %args = validate( @_, { uuid => 1, type => 1 } );
return undef unless $args{'uuid'};
return $self->_record_cas_filename(
- type => $args{'type'},
- uuid => $args{'uuid'}
+ type => $args{'type'},
+ uuid => $args{'uuid'}
) ? 1 : 0;
}
@@ -993,14 +1005,18 @@
my %args = validate( @_ => { type => 1 } );
#return just the filenames, which, File::Find::Rule doesn't seem capable of
- my @record_uuids = map { my @path = split( qr'/', $_ ); pop @path }
- File::Find::Rule->file->maxdepth(3)->in(
- File::Spec->catdir( $self->fs_root => $self->_record_type_root( $args{'type'} ) )
- );
-
-
+ my @record_uuids
+ = map { my @path = split( qr'/', $_ ); pop @path }
+ File::Find::Rule->file->maxdepth(3)->in(
+ File::Spec->catdir(
+ $self->fs_root => $self->_record_type_root( $args{'type'} )
+ )
+ );
- return [grep {$self->_record_cas_filename(type => $args{'type'}, uuid => $_ ) }@record_uuids
+ return [
+ grep {
+ $self->_record_cas_filename( type => $args{'type'}, uuid => $_ )
+ } @record_uuids
];
}
@@ -1030,7 +1046,8 @@
my $self = shift;
my %args = validate( @_, { path => 1 } );
- $self->_read_file(File::Spec->catfile($self->userdata_dir, $args{path}));
+ $self->_read_file(
+ File::Spec->catfile( $self->userdata_dir, $args{path} ) );
}
=head2 write_userdata_file
@@ -1044,7 +1061,7 @@
my %args = validate( @_, { path => 1, content => 1 } );
$self->_write_file(
- path => File::Spec->catfile($self->userdata_dir, $args{path}),
+ path => File::Spec->catfile( $self->userdata_dir, $args{path} ),
content => $args{content},
);
}
More information about the Bps-public-commit
mailing list