[Bps-public-commit] r11680 - in Prophet/trunk: .
jesse at bestpractical.com
jesse at bestpractical.com
Fri Apr 11 10:55:18 EDT 2008
Author: jesse
Date: Fri Apr 11 10:55:11 2008
New Revision: 11680
Added:
Prophet/trunk/lib/Prophet/ReplicaExporter.pm
Modified:
Prophet/trunk/ (props changed)
Prophet/trunk/lib/Prophet/Handle.pm
Prophet/trunk/lib/Prophet/Replica.pm
Log:
r29596 at 104: jesse | 2008-04-11 10:54:41 -0400
* Extracted the replica exporter. started to try to slightly clean up the handle for merge into replica
Modified: Prophet/trunk/lib/Prophet/Handle.pm
==============================================================================
--- Prophet/trunk/lib/Prophet/Handle.pm (original)
+++ Prophet/trunk/lib/Prophet/Handle.pm Fri Apr 11 10:55:11 2008
@@ -23,25 +23,6 @@
return Prophet::Handle::SVN->new(@_);
}
-=head2 integrate_changeset L<Prophet::ChangeSet>
-
-Given a L<Prophet::ChangeSet>, integrates each and every change within that changeset into the handle's replica.
-
-This routine also records that we've seen this changeset (and hence everything before it) from both the peer who sent it to us AND the replica who originally created it.
-
-
-=cut
-
-sub integrate_changeset {
- my $self = shift;
- my ($changeset) = validate_pos(@_, { isa => 'Prophet::ChangeSet'});
-
- $self->begin_edit();
- $self->record_changeset($changeset);
- $self->record_changeset_integration($changeset);
- $self->commit_edit();
-}
-
=head2 record_resolutions Prophet::ChangeSet, (Prophet::Handle, a resolution database handle)
Given a resolution changeset and a resolution database handle,
@@ -89,6 +70,29 @@
);
}
+
+=head1 Routines dealing with integrating changesets into a replica
+
+
+=head2 integrate_changeset L<Prophet::ChangeSet>
+
+Given a L<Prophet::ChangeSet>, integrates each and every change within that changeset into the handle's replica.
+
+This routine also records that we've seen this changeset (and hence everything before it) from both the peer who sent it to us AND the replica who originally created it.
+
+
+=cut
+
+sub integrate_changeset {
+ my $self = shift;
+ my ($changeset) = validate_pos(@_, { isa => 'Prophet::ChangeSet'});
+
+ $self->begin_edit();
+ $self->record_changeset($changeset);
+ $self->record_changeset_integration($changeset);
+ $self->commit_edit();
+}
+
=head2 record_changeset Prophet::ChangeSet
Inside an edit (transaction), integrate all changes in this transaction
@@ -99,14 +103,11 @@
sub record_changeset {
my $self = shift;
my ($changeset) = validate_pos(@_, { isa => 'Prophet::ChangeSet'});
-
eval {
-
my $inside_edit = $self->current_edit ? 1 : 0;
$self->begin_edit() unless ($inside_edit);
$self->_integrate_change($_) for ( $changeset->changes );
$self->_post_process_integrated_changeset($changeset);
-
$self->commit_edit() unless ($inside_edit);
};
die($@) if ($@);
@@ -117,25 +118,13 @@
my ($change) = validate_pos(@_, { isa => 'Prophet::Change'});
my %new_props = map { $_->name => $_->new_value } $change->prop_changes;
-
if ( $change->change_type eq 'add_file' ) {
- $self->create_node(
- type => $change->node_type,
- uuid => $change->node_uuid,
- props => \%new_props
- );
+ $self->create_node( type => $change->node_type, uuid => $change->node_uuid, props => \%new_props);
} elsif ( $change->change_type eq 'add_dir' ) {
} elsif ( $change->change_type eq 'update_file' ) {
- $self->set_node_props(
- type => $change->node_type,
- uuid => $change->node_uuid,
- props => \%new_props
- );
+ $self->set_node_props( type => $change->node_type, uuid => $change->node_uuid, props => \%new_props);
} elsif ( $change->change_type eq 'delete' ) {
- $self->delete_node(
- type => $change->node_type,
- uuid => $change->node_uuid
- );
+ $self->delete_node( type => $change->node_type, uuid => $change->node_uuid);
} else {
Carp::confess( " I have never heard of the change type: " . $change->change_type );
}
@@ -168,6 +157,16 @@
return $self->_record_metadata_for( $MERGETICKET_METATYPE, $source_uuid, 'last-changeset', $sequence_no );
}
+
+
+
+
+
+=head1 metadata storage routines
+
+=cut
+
+
=head2 metadata_storage $RECORD_TYPE, $PROPERTY_NAME
Returns a function which takes a UUID and an optional value to get (or set) metadata rows in a metadata table.
@@ -217,6 +216,10 @@
);
}
+
+
+
+
=head1 DATA STORE API
=head1 The following functions need to be implemented by any Prophet backing store.
@@ -285,9 +288,29 @@
+
+
=head2 The following functions need to be implemented by any _writable_ prophet backing store
=cut
+
+=head2 The following optional routines are provided for you to override with backing-store specific behaviour
+
+
+=head3 _post_process_integrated_changeset Prophet::ChangeSet
+
+Called after the replica has integrated a new changeset but before closing the current transaction/edit.
+
+The SVN backend, for example, uses this to record author metadata about this changeset.
+
+=cut
+
+
+sub _post_process_integrated_changeset {
+ return 1;
+}
+
+
1;
Modified: Prophet/trunk/lib/Prophet/Replica.pm
==============================================================================
--- Prophet/trunk/lib/Prophet/Replica.pm (original)
+++ Prophet/trunk/lib/Prophet/Replica.pm Fri Apr 11 10:55:11 2008
@@ -46,7 +46,6 @@
=cut
-
sub register_replica_scheme {
my $class = shift;
my %args = validate(@_, { class => 1, scheme => 1});
@@ -601,187 +600,10 @@
sub export_to {
my $self = shift;
my %args = validate( @_, { path => 1, } );
+ Prophet::ReplicaExporter->require();
- my $replica_root = dir( $args{path}, $self->db_uuid );
- my $cas_dir = dir( $replica_root => 'cas' );
- my $record_cas_dir = dir( $cas_dir => 'records' );
- my $changeset_cas_dir = dir( $cas_dir => 'changesets' );
- my $record_dir = dir( $replica_root => 'records' );
-
- _mkdir( $args{path} );
- _mkdir($replica_root);
- _mkdir($record_dir);
- _mkdir($cas_dir);
- make_tiered_dirs($record_cas_dir);
- make_tiered_dirs($changeset_cas_dir);
-
- $self->_init_export_metadata( root => $replica_root );
-
- foreach my $type ( @{ $self->prophet_handle->enumerate_types } ) {
- $self->export_records(
- type => $type,
- root => $replica_root,
- cas_dir => $record_cas_dir
- );
- }
-
- $self->export_changesets( root => $replica_root, cas_dir => $changeset_cas_dir );
-
- #$self->export_resolutions( path => dir( $replica_root, 'resolutions'), resdb_handle => $args{'resdb_handle'} );
-
-}
-
-sub export_resolutions {
- my $self = shift;
- my $replica = Prophet::Replica->new();
-
- # ...
-}
-
-sub _init_export_metadata {
- my $self = shift;
- my %args = validate( @_, { root => 1 } );
-
- $self->_output_oneliner_file( path => file( $args{'root'}, 'replica-uuid' ), content => $self->uuid );
- $self->_output_oneliner_file( path => file( $args{'root'}, 'replica-version' ), content => '1' );
- $self->_output_oneliner_file(
- path => file( $args{'root'}, 'latest-sequence-no' ),
- content => $self->most_recent_changeset
- );
-
-}
-
-sub export_records {
- my $self = shift;
- my %args = validate( @_, { root => 1, type => 1, cas_dir => 1 } );
-
- make_tiered_dirs( dir( $args{'root'} => 'records' => $args{'type'} ) );
-
- my $collection = Prophet::Collection->new(
- handle => $self->prophet_handle,
- type => $args{type}
- );
- $collection->matching( sub {1} );
- $self->export_record(
- record_dir => dir( $args{'root'}, 'records', $_->type ),
- cas_dir => $args{'cas_dir'},
- record => $_
- ) for @$collection;
-
-}
-
-sub export_record {
- my $self = shift;
- my %args = validate(
- @_,
- { record => { isa => 'Prophet::Record' },
- record_dir => 1,
- cas_dir => 1,
- }
- );
-
- my $content = YAML::Syck::Dump( $args{'record'}->get_props );
- my ($cas_key) = $self->_write_to_cas(
- content_ref => \$content,
- cas_dir => $args{'cas_dir'}
- );
-
- my $idx_filename = file(
- $args{'record_dir'},
- substr( $args{record}->uuid, 0, 1 ),
- substr( $args{record}->uuid, 1, 1 ),
- $args{record}->uuid
- );
-
- open( my $record_index, ">>", $idx_filename ) || die $!;
-
- # XXX TODO: skip if the index already has this version of the record;
- # XXX TODO FETCH THAT
- my $record_last_changed_changeset = 1;
-
- my $index_row = pack( 'NH40', $record_last_changed_changeset, $cas_key );
- print $record_index $index_row || die $!;
- close $record_index;
-}
-
-sub export_changesets {
- my $self = shift;
- my %args = validate( @_, { root => 1, cas_dir => 1 } );
-
- open( my $cs_file, ">" . file( $args{'root'}, 'changesets.idx' ) ) || die $!;
-
- foreach my $changeset ( @{ $self->fetch_changesets( after => 0 ) } ) {
- my $hash_changeset = $changeset->as_hash;
- delete $hash_changeset->{'sequence_no'};
- delete $hash_changeset->{'source_uuid'};
-
- my $content = YAML::Syck::Dump($hash_changeset);
- my $cas_key = $self->_write_to_cas(
- content_ref => \$content,
- cas_dir => $args{'cas_dir'}
- );
-
- # XXX TODO we should only actually be encoding the sha1 of content once
- # and then converting. this is wasteful
-
- my $packed_cas_key = sha1($content);
-
- print $cs_file pack( 'Na16Na20',
- $changeset->sequence_no,
- Data::UUID->new->from_string( $changeset->original_source_uuid ),
- $changeset->original_sequence_no,
- $packed_cas_key )
- || die $!;
-
- }
-
- close($cs_file);
-}
-
-sub _mkdir {
- my $path = shift;
- unless ( -d $path ) {
- mkdir($path) || die $@;
- }
- unless ( -w $path ) {
- die "$path not writable";
- }
-
-}
-
-sub make_tiered_dirs {
- my $base = shift;
- _mkdir( dir($base) );
- for my $a ( 0 .. 9, 'a' .. 'f' ) {
- _mkdir( dir( $base => $a ) );
- for my $b ( 0 .. 9, 'a' .. 'f' ) {
- _mkdir( dir( $base => $a => $b ) );
- }
- }
-
-}
-
-sub _write_to_cas {
- my $self = shift;
- my %args = validate( @_, { content_ref => 1, cas_dir => 1 } );
-
- my $content = ${ $args{'content_ref'} };
- my $fingerprint = sha1_hex($content);
- my $content_filename
- = file( $args{'cas_dir'}, substr( $fingerprint, 0, 1 ), substr( $fingerprint, 1, 1 ), $fingerprint );
- open( my $output, ">", $content_filename ) || die "Could not open $content_filename";
- print $output $content || die $!;
- close $output;
- return $fingerprint;
-}
-
-sub _output_oneliner_file {
- my $self = shift;
- my %args = validate( @_, { path => 1, content => 1 } );
-
- open( my $file, ">", $args{'path'} ) || die $!;
- print $file $args{'content'} || die $!;
- close $file || die $!;
+ my $exporter = Prophet::ReplicaExporter->new({target_path => $args{'path'}, replica => $self});
+ $exporter->export();
}
1;
Added: Prophet/trunk/lib/Prophet/ReplicaExporter.pm
==============================================================================
--- (empty file)
+++ Prophet/trunk/lib/Prophet/ReplicaExporter.pm Fri Apr 11 10:55:11 2008
@@ -0,0 +1,345 @@
+use warnings;
+use strict;
+
+package Prophet::ReplicaExporter;
+use base qw/Class::Accessor/;
+use Params::Validate qw(:all);
+use Path::Class;
+use Digest::SHA1 qw(sha1 sha1_hex);
+use YAML::Syck;
+use UNIVERSAL::require;
+
+
+__PACKAGE__->mk_accessors(qw( replica target_path));
+
+=head1 NAME
+
+Prophet::ReplicaExporter
+
+=head1 DESCRIPTION
+
+A utility class which exports a replica to a serialized on-disk format
+
+
+=cut
+
+=head1 METHODS
+
+=head2 new
+
+Instantiates a new replica exporter object
+
+=cut
+
+
+=head2 export
+
+This routine will export a copy of this prophet database replica to a flat file on disk suitable for
+publishing via HTTP or over a local filesystem for other Prophet replicas to clone or incorporate changes from.
+
+
+=head3 text-dump replica format
+
+=head4 overview
+
+ $URL
+ /<db-uuid>/
+ /replica-uuid
+ /latest-sequence-no
+ /replica-version
+ /cas/records/<substr(sha1,0,1)>/substr(sha1,1,1)/<sha1>
+ /cas/changesets/<substr(sha1,0,1)>/substr(sha1,1,1)/<sha1>
+ /records (optional?)
+ /<record type> (for resolution is actually _prophet-resolution-<cas-key>)
+ /<record uuid> which is a file containing a list of 0 or more rows
+ last-changed-sequence-no : cas key
+
+ /changesets.idx
+
+ index which has records:
+ each record is : local-replica-seq-no : original-uuid : original-seq-no : cas key
+ ...
+
+ /resolutions/
+ /replica-uuid
+ /latest-sequence-no
+ /cas/<substr(sha1,0,1)>/substr(sha1,1,1)/<sha1>
+ /content (optional?)
+ /_prophet-resolution-<cas-key> (cas-key == a hash the conflicting change)
+ /<record uuid> (record uuid == the originating replica)
+ last-changed-sequence-no : <cas key to the content of the resolution>
+
+ /changesets.idx
+ index which has records:
+ each record is : local-replica-seq-no : original-uuid : original-seq-no : cas key
+ ...
+
+
+Inside the top level directory for the mirror, you'll find a directory named as B<a hex-encoded UUID>.
+This directory is the root of the published replica. The uuid uniquely identifes the database being replicated.
+All replicas of this database will share the same UUID.
+
+Inside the B<<db-uuid>> directory, are a set of files and directories that make up the actual content of the database replica:
+
+=over 2
+
+=item C<replica-uuid>
+
+Contains the replica's hex-encoded UUID.
+
+=item C<replica-version>
+
+Contains a single integer that defines the replica format.
+
+The current replica version is 1.
+
+=item C<latest-sequence-no>
+
+Contains a single integer, the replica's most recent sequence number.
+
+=item C<cas/records>
+
+=item C<cas/changesets>
+
+The C<cas> directory holds changesets and records, each keyed by a
+hex-encoded hash of the item's content. Inside the C<cas> directory, you'll find
+a two-level deep directory tree of single-character hex digits.
+You'll find the changeset with the sha1 digest C<f4b7489b21f8d107ad8df78750a410c028abbf6c>
+inside C<cas/changesets/f/4/f4b7489b21f8d107ad8df78750a410c028abbf6c>.
+
+You'll find the record with the sha1 digest C<dd6fb674de879a1a4762d690141cdfee138daf65> inside
+C<cas/records/d/d/dd6fb674de879a1a4762d690141cdfee138daf65>.
+
+
+TODO: define the format for changesets and records
+
+
+=item C<records>
+
+Files inside the C<records> directory are index files which list off all published versions of a record and the key necessary to retrieve the record from the I<content-addressed store>.
+
+Inside the C<records> directory, you'll find directories named for each
+C<type> in your database. Inside each C<type> directory, you'll find a two-level directory tree of single hexadecimal digits. You'll find the record with the type <Foo> and the UUID C<29A3CA16-03C5-11DD-9AE0-E25CFCEE7EC4> stored in
+
+ records/Foo/2/9/29A3CA16-03C5-11DD-9AE0-E25CFCEE7EC4
+
+
+The format of record files is:
+
+ <unsigned-long-int: last-changed-sequence-no><40 chars of hex: cas key>
+
+The file is sorted in asecnding order by revision id.
+
+
+=item C<changesets.idx>
+
+The C<changesets.idx> file lists each changeset in this replica and
+provides an index into the B<content-addressed storage> to fetch
+the content of the changeset.
+
+The format of record files is:
+
+ <unsigned-long-int: sequence-no><16 bytes: changeset original source uuid><unsigned-long-int: changeset original source sequence no><16 bytes: cas key - sha1 sum of the changeset's content>
+
+The file is sorted in ascending order by revision id.
+
+
+=item C<resolutions>
+
+=over 2
+
+=item TODO DOC RESOLUTIONS
+
+
+=back
+
+=back
+
+=cut
+
+sub export {
+ my $self = shift;
+
+ my $replica_root = dir( $self->target_path, $self->replica->db_uuid );
+ my $cas_dir = dir( $replica_root => 'cas' );
+ my $record_cas_dir = dir( $cas_dir => 'records' );
+ my $changeset_cas_dir = dir( $cas_dir => 'changesets' );
+ my $record_dir = dir( $replica_root => 'records' );
+
+ _mkdir( $self->target_path);
+ _mkdir($replica_root);
+ _mkdir($record_dir);
+ _mkdir($cas_dir);
+ make_tiered_dirs($record_cas_dir);
+ make_tiered_dirs($changeset_cas_dir);
+
+ $self->_init_export_metadata( root => $replica_root );
+
+ foreach my $type ( @{ $self->replica->prophet_handle->enumerate_types } ) {
+ $self->export_records(
+ type => $type,
+ root => $replica_root,
+ cas_dir => $record_cas_dir
+ );
+ }
+
+ $self->export_changesets( root => $replica_root, cas_dir => $changeset_cas_dir );
+
+ #$self->export_resolutions( path => dir( $replica_root, 'resolutions'), resdb_handle => $args{'resdb_handle'} );
+
+}
+
+sub export_resolutions {
+ my $self = shift;
+ my $replica = Prophet::Replica->new();
+
+ # ...
+}
+
+sub _init_export_metadata {
+ my $self = shift;
+ my %args = validate( @_, { root => 1 } );
+
+ $self->_output_oneliner_file( path => file( $args{'root'}, 'replica-uuid' ), content => $self->replica->uuid );
+ $self->_output_oneliner_file( path => file( $args{'root'}, 'replica-version' ), content => '1' );
+ $self->_output_oneliner_file(
+ path => file( $args{'root'}, 'latest-sequence-no' ),
+ content => $self->replica->most_recent_changeset
+ );
+
+}
+
+sub export_records {
+ my $self = shift;
+ my %args = validate( @_, { root => 1, type => 1, cas_dir => 1 } );
+
+ make_tiered_dirs( dir( $args{'root'} => 'records' => $args{'type'} ) );
+
+ my $collection = Prophet::Collection->new(
+ handle => $self->replica->prophet_handle,
+ type => $args{type}
+ );
+ $collection->matching( sub {1} );
+ $self->export_record(
+ record_dir => dir( $args{'root'}, 'records', $_->type ),
+ cas_dir => $args{'cas_dir'},
+ record => $_
+ ) for @$collection;
+
+}
+
+sub export_record {
+ my $self = shift;
+ my %args = validate(
+ @_,
+ { record => { isa => 'Prophet::Record' },
+ record_dir => 1,
+ cas_dir => 1,
+ }
+ );
+
+ my $content = YAML::Syck::Dump( $args{'record'}->get_props );
+ my ($cas_key) = $self->_write_to_cas(
+ content_ref => \$content,
+ cas_dir => $args{'cas_dir'}
+ );
+
+ my $idx_filename = file(
+ $args{'record_dir'},
+ substr( $args{record}->uuid, 0, 1 ),
+ substr( $args{record}->uuid, 1, 1 ),
+ $args{record}->uuid
+ );
+
+ open( my $record_index, ">>", $idx_filename ) || die $!;
+
+ # XXX TODO: skip if the index already has this version of the record;
+ # XXX TODO FETCH THAT
+ my $record_last_changed_changeset = 1;
+
+ my $index_row = pack( 'NH40', $record_last_changed_changeset, $cas_key );
+ print $record_index $index_row || die $!;
+ close $record_index;
+}
+
+sub export_changesets {
+ my $self = shift;
+ my %args = validate( @_, { root => 1, cas_dir => 1 } );
+
+ open( my $cs_file, ">" . file( $args{'root'}, 'changesets.idx' ) ) || die $!;
+
+ foreach my $changeset ( @{ $self->replica->fetch_changesets( after => 0 ) } ) {
+ my $hash_changeset = $changeset->as_hash;
+ delete $hash_changeset->{'sequence_no'};
+ delete $hash_changeset->{'source_uuid'};
+
+ my $content = YAML::Syck::Dump($hash_changeset);
+ my $cas_key = $self->_write_to_cas(
+ content_ref => \$content,
+ cas_dir => $args{'cas_dir'}
+ );
+
+ # XXX TODO we should only actually be encoding the sha1 of content once
+ # and then converting. this is wasteful
+
+ my $packed_cas_key = sha1($content);
+
+ print $cs_file pack( 'Na16Na20',
+ $changeset->sequence_no,
+ Data::UUID->new->from_string( $changeset->original_source_uuid ),
+ $changeset->original_sequence_no,
+ $packed_cas_key )
+ || die $!;
+
+ }
+
+ close($cs_file);
+}
+
+sub _mkdir {
+ my $path = shift;
+ unless ( -d $path ) {
+ mkdir($path) || die $@;
+ }
+ unless ( -w $path ) {
+ die "$path not writable";
+ }
+
+}
+
+sub make_tiered_dirs {
+ my $base = shift;
+ _mkdir( dir($base) );
+ for my $a ( 0 .. 9, 'a' .. 'f' ) {
+ _mkdir( dir( $base => $a ) );
+ for my $b ( 0 .. 9, 'a' .. 'f' ) {
+ _mkdir( dir( $base => $a => $b ) );
+ }
+ }
+
+}
+
+sub _write_to_cas {
+ my $self = shift;
+ my %args = validate( @_, { content_ref => 1, cas_dir => 1 } );
+
+ my $content = ${ $args{'content_ref'} };
+ my $fingerprint = sha1_hex($content);
+ my $content_filename
+ = file( $args{'cas_dir'}, substr( $fingerprint, 0, 1 ), substr( $fingerprint, 1, 1 ), $fingerprint );
+ open( my $output, ">", $content_filename ) || die "Could not open $content_filename";
+ print $output $content || die $!;
+ close $output;
+ return $fingerprint;
+}
+
+sub _output_oneliner_file {
+ my $self = shift;
+ my %args = validate( @_, { path => 1, content => 1 } );
+
+ open( my $file, ">", $args{'path'} ) || die $!;
+ print $file $args{'content'} || die $!;
+ close $file || die $!;
+}
+
+1;
More information about the Bps-public-commit
mailing list