[Bps-public-commit] Prophet branch, master, updated. fc1ae46803313c2b1f168cfa2013ca0d98752557

jesse jesse at bestpractical.com
Mon Jun 8 15:59:02 EDT 2009


The branch, master has been updated
       via  fc1ae46803313c2b1f168cfa2013ca0d98752557 (commit)
       via  09f265ff70198b6e775bb88cbc96274694a972c7 (commit)
       via  350524a752991fbf727bbfc68a589b3b6dfd391f (commit)
       via  7a1917222ec4fdc4c5c702613e1e56c98161b976 (commit)
       via  c1f0f11fa6995513243947395b71fb692414071f (commit)
      from  a9677ee0b3efe74d36118c63fe9e747562c9fbe0 (commit)

Summary of changes:
 lib/Prophet/CLI/Command/Merge.pm |   40 ++++++++++++++++++++------
 lib/Prophet/FilesystemReplica.pm |   39 +++++++++++++++++--------
 lib/Prophet/ForeignReplica.pm    |   28 ------------------
 lib/Prophet/Replica.pm           |   58 ++++++++++++++++++++++++-------------
 lib/Prophet/Replica/sqlite.pm    |   27 +++++++++++++++--
 5 files changed, 119 insertions(+), 73 deletions(-)

- Log -----------------------------------------------------------------
commit c1f0f11fa6995513243947395b71fb692414071f
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Mon Jun 8 15:54:51 2009 -0400

    Remove outdated duplicate code

diff --git a/lib/Prophet/ForeignReplica.pm b/lib/Prophet/ForeignReplica.pm
index 1079b4b..b62ce18 100644
--- a/lib/Prophet/ForeignReplica.pm
+++ b/lib/Prophet/ForeignReplica.pm
@@ -91,34 +91,6 @@ sub prompt_for_login {
     return ( $username, $password );
 }
 
-
-=head2 has_seen_changeset Prophet::ChangeSet
-
-This is a simplification of L<Prophet::Replica/has_seen_changeset>. Because
-only a single Prophet replica is talking to this foreign replica, we only need
-to care about whether that replica (not the original replica) has given us the
-changeset.
-
-=cut
-
-sub has_seen_changeset {
-    my $self = shift;
-    my ($changeset) = validate_pos( @_, { isa => "Prophet::ChangeSet" } );
-
-    # Has our host replica given this changeset to us yet?
-    # XXX TODO - should actually be checking the changeset id and the record id in a lookup table
-    # of all the changesets that may have come from the source
-    #
-    if ($changeset->original_source_uuid eq $self->uuid) { return 1}
-
-    if ($self->last_changeset_from_source($changeset->original_source_uuid) >= $changeset->original_sequence_no) { 
-        # XXX TODO - don't need this, right? || $self->last_changeset_from_source($changeset->source_uuid) >= $changeset->sequence_no ) {
-        return 1;
-    } else {
-        return 0;
-    }
-}
-
 sub log {
     my $self = shift;
     my ($msg) = validate_pos(@_, 1);

commit 7a1917222ec4fdc4c5c702613e1e56c98161b976
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Mon Jun 8 15:55:25 2009 -0400

    Added a "before_load_changeset_callback" to filesystem replica types

diff --git a/lib/Prophet/FilesystemReplica.pm b/lib/Prophet/FilesystemReplica.pm
index 8c57b38..1506036 100644
--- a/lib/Prophet/FilesystemReplica.pm
+++ b/lib/Prophet/FilesystemReplica.pm
@@ -155,11 +155,12 @@ sub traverse_changesets {
     my $self = shift;
     my %args = validate(
         @_,
-        {   after    => 1,
-            callback => { type => CODEREF} ,
-            reporting_callback => { type => CODEREF, optional => 1 },
-            until    => 0,
-            reverse  => 0,
+        {   after                          => 1,
+            callback                       => { type => CODEREF },
+            before_load_changeset_callback => { type => CODEREF, optional => 1 },
+            reporting_callback             => { type => CODEREF, optional => 1 },
+            until                          => 0,
+            reverse                        => 0,
             load_changesets => { default => 1 }
         }
     );
@@ -167,35 +168,49 @@ sub traverse_changesets {
     my $first_rev = ( $args{'after'} + 1 ) || 1;
     my $latest = $self->latest_sequence_no;
 
-    if ( defined $args{until} && $args{until} < $latest) {
-            $latest = $args{until};
+    if ( defined $args{until} && $args{until} < $latest ) {
+        $latest = $args{until};
     }
 
     my $chgidx = $self->read_changeset_index;
     $self->log_debug("Traversing changesets between $first_rev and $latest");
     my @range = ( $first_rev .. $latest );
     @range = reverse @range if $args{reverse};
-    for my $rev ( @range ) {
+    for my $rev (@range) {
         $self->log_debug("Fetching changeset $rev");
+
+        if ( $args{'before_load_changeset_callback'} ) {
+            my $continue = $args{'before_load_changeset_callback'}->(
+                changeset_metadata => $self->_changeset_index_entry(
+                    sequence_no => $rev,
+                    index_file  => $chgidx
+                )
+            );
+
+            next unless $continue;
+
+        }
+
         my $data;
         if ( $args{load_changesets} ) {
             $data = $self->_get_changeset_via_index(
                 sequence_no => $rev,
                 index_file  => $chgidx
             );
-            $args{callback}->( changeset => $data);
+            $args{callback}->( changeset => $data );
         } else {
-           $data = $self->_changeset_index_entry(
+            $data = $self->_changeset_index_entry(
                 sequence_no => $rev,
                 index_file  => $chgidx
             );
-            $args{callback}->( changeset_metadata => $data);
+            $args{callback}->( changeset_metadata => $data );
 
         }
-        $args{reporting_callback}->($data) if ($args{reporting_callback});
+        $args{reporting_callback}->($data) if ( $args{reporting_callback} );
 
     }
 }
+
 sub _changeset_index_entry {
     my $self = shift;
     my %args = validate( @_, { sequence_no => 1, index_file => 1 } );

commit 350524a752991fbf727bbfc68a589b3b6dfd391f
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Mon Jun 8 15:56:01 2009 -0400

    Update replica base class to be able to skip changesets for integration without loading changesets at all

diff --git a/lib/Prophet/Replica.pm b/lib/Prophet/Replica.pm
index 51a8807..b7895b8 100644
--- a/lib/Prophet/Replica.pm
+++ b/lib/Prophet/Replica.pm
@@ -161,13 +161,14 @@ sub import_changesets {
     my $self = shift;
     my %args = validate(
         @_,
-        {   from               => { isa      => 'Prophet::Replica' },
-            resdb              => { optional => 1 },
-            resolver           => { optional => 1 },
-            resolver_class     => { optional => 1 },
-            conflict_callback  => { optional => 1 },
-            reporting_callback => { optional => 1 },
-            force              => { optional => 1 },
+        {   from                           => { isa      => 'Prophet::Replica' },
+            resdb                          => { optional => 1 },
+            resolver                       => { optional => 1 },
+            resolver_class                 => { optional => 1 },
+            conflict_callback              => { type => CODEREF, optional => 1 },
+            reporting_callback             => { type => CODEREF, optional => 1 },
+            before_load_changeset_callback => { type => CODEREF, optional => 1 },
+            force                          => { optional => 1 },
         }
     );
 
@@ -180,23 +181,23 @@ sub import_changesets {
     $self->log_debug("Integrating changesets from ".$source->uuid. " after ". $self->last_changeset_from_source( $self->uuid ));
 
     $source->traverse_changesets(
-        after    => $self->last_changeset_from_source( $self->uuid ),
-        callback => sub {
+        after                          => $self->last_changeset_from_source( $self->uuid ),
+        ($args{before_load_changeset_callback} ? (before_load_changeset_callback => $args{before_load_changeset_callback}) : ()),
+        callback                       => sub {
             my %callback_args = (@_);
             $self->integrate_changeset(
                 changeset          => $callback_args{changeset},
-                conflict_callback  => $args{conflict_callback},
+                conflict_callback  => $args{'conflict_callback'},
                 reporting_callback => $args{'reporting_callback'},
-                resolver           => $args{resolver},
+                resolver           => $args{'resolver'},
                 resolver_class     => $args{'resolver_class'},
                 resdb              => $args{'resdb'},
             );
 
-            if (ref ($callback_args{after_integrate_changeset})) {
-                $callback_args{after_integrate_changeset}->(changeset => $callback_args{changeset});
+            if ( ref( $callback_args{'after_integrate_changeset'} ) ) {
+                $callback_args{'after_integrate_changeset'}->( changeset => $callback_args{'changeset'} );
             }
 
-
         }
     );
 }
@@ -230,6 +231,21 @@ sub import_resolutions_from_remote_replica {
 
     $self->resolution_db_handle->import_changesets(
         from     => $source->resolution_db_handle,
+        before_load_changeset_callback  => sub {
+                my %args = (@_);
+                my ($seq, $orig_uuid, $orig_seq, $key) = @{$args{changeset_metadata}};
+                # skip changesets we've seen before
+                if (
+                $self->resolution_db_handle->has_seen_changeset( source_uuid => $orig_uuid,
+                                           sequence_no => $orig_seq) ){
+                        return undef;
+                } else {
+                    return 1;
+                }
+
+            },
+
+
         resolver => sub { die "not implemented yet" },
         force    => $args{force},
     );
@@ -378,7 +394,7 @@ sub last_changeset_from_source {
 }
 
 
-=head3 has_seen_changeset L<Prophet::ChangeSet>
+=head3 has_seen_changeset { source_uuid => <uuid>, sequence_no => <int> }
 
 Returns true if we've previously integrated this changeset, even if we
 originally received it from a different peer.
@@ -387,20 +403,20 @@ originally received it from a different peer.
 
 sub has_seen_changeset {
     my $self = shift;
-    my ($changeset) = validate_pos( @_, { isa => "Prophet::ChangeSet" } );
+    my %args = validate( @_, {source_uuid => 1, sequence_no => 1});
     $self->log_debug("Checking to see if we've ever seen changeset " .
-        $changeset->original_sequence_no . " from " .
-        $self->display_name_for_uuid($changeset->original_source_uuid));
+        $args{sequence_no} . " from " .
+        $self->display_name_for_uuid($args{source_uuid}));
 
     # If the changeset originated locally, we never want it
-    if  ($changeset->original_source_uuid eq $self->uuid ) {
+    if ($args{source_uuid} eq $self->uuid ) {
         $self->log_debug("\t  - We have. (It originated locally.)");
         return 1 
     }
     # Otherwise, if the we have a merge ticket from the source, we don't want
     # the changeset if the source's sequence # is >= the changeset's sequence
     # #, we can safely skip it
-    elsif ( $self->last_changeset_from_source( $changeset->original_source_uuid ) >= $changeset->original_sequence_no ) {
+    elsif ( $self->last_changeset_from_source( $args{source_uuid} ) >= $args{sequence_no} ) {
         $self->log_debug("\t  - We have seen this or a more recent changeset from remote.");
         return 1;
     } else {
@@ -484,7 +500,7 @@ sub should_accept_changeset {
         " from ".$self->display_name_for_uuid($changeset->original_source_uuid));
     return undef if (! $changeset->has_changes);
     return undef if ( $changeset->is_nullification || $changeset->is_resolution );
-    return undef if $self->has_seen_changeset( $changeset );
+    return undef if $self->has_seen_changeset( sequence_no => $changeset->original_sequence_no,  source_uuid => $changeset->original_source_uuid );
     $self->log_debug("Yes, it has changes, isn't a nullification and I haven't seen it before");
 
     return 1;

commit 09f265ff70198b6e775bb88cbc96274694a972c7
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Mon Jun 8 15:57:26 2009 -0400

    Gave the sqlite replica type the ability to generate changeset index entries; added support for the new callback

diff --git a/lib/Prophet/Replica/sqlite.pm b/lib/Prophet/Replica/sqlite.pm
index 3569f95..ac9bdcd 100644
--- a/lib/Prophet/Replica/sqlite.pm
+++ b/lib/Prophet/Replica/sqlite.pm
@@ -408,6 +408,9 @@ sub traverse_changesets {
             callback        => 1,
             until           => 0,
             reverse         => 0,
+            before_load_changeset_callback => { type => CODEREF, optional => 1},
+            reporting_callback => { type => CODEREF, optional => 1 },
+
             load_changesets => { default => 1 }
         }
     );
@@ -423,20 +426,38 @@ sub traverse_changesets {
     my @range = ( $first_rev .. $latest );
     @range = reverse @range if $args{reverse};
     for my $rev (@range) {
+
+        if ( $args{'before_load_changeset_callback'} ) {
+            my $continue = $args{'before_load_changeset_callback'}->(
+                changeset_metadata => $self->_changeset_index_entry(
+                    sequence_no => $rev,
+                )
+            );
+        }
+
         $self->log_debug("Fetching changeset $rev");
         my $data;
         if ( $args{load_changesets} ) {
             $data = $self->_load_changeset_from_db( sequence_no => $rev );
-            $args{callback}->( changeset => $data );
+            $args{callback}->( changeset =>$data);
         } else {
-            my $row = $self->_load_changeset_metadata_from_db( sequence_no => $rev );
-            $data = [ $row->{sequence_no}, $row->{original_source_uuid}, $row->{original_sequence_no}, $row->{sha1} ];
+            $data = $self->_changeset_index_entry( sequence_no => $rev);
             $args{callback}->(changeset_metadata => $data);
 
         }
+        $args{reporting_callback}->($data) if ($args{reporting_callback});
+
     }
 }
 
+sub _changeset_index_entry {
+    my $self = shift;
+    my %args = ( sequence_no => undef, @_ );
+    my $row  = $self->_load_changeset_metadata_from_db( sequence_no => $args{sequence_no} );
+    my $data = [ $row->{sequence_no}, $row->{original_source_uuid}, $row->{original_sequence_no}, $row->{sha1} ];
+    return $data;
+}
+
 
 sub read_changeset_index {
     my $self =shift;

commit fc1ae46803313c2b1f168cfa2013ca0d98752557
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Mon Jun 8 15:58:07 2009 -0400

    got merges to skip changesets without loading them if we know we've seen them; renamed a variable for clarity

diff --git a/lib/Prophet/CLI/Command/Merge.pm b/lib/Prophet/CLI/Command/Merge.pm
index ec22822..1c37595 100644
--- a/lib/Prophet/CLI/Command/Merge.pm
+++ b/lib/Prophet/CLI/Command/Merge.pm
@@ -34,9 +34,8 @@ sub run {
         from  => $self->source,
         force => $self->has_arg('force'),
     ) if ($self->source->resolution_db_handle);
-    
+   
     my $changesets = $self->_do_merge();
-
     #Prophet::CLI->start_pager();
     $self->print_report($changesets) 
 }
@@ -77,25 +76,49 @@ Returns the number of changesets merged.
 sub _do_merge {
     my ($self) = @_;
 
+    my $source_latest = $self->source->latest_sequence_no() || 0;
+    my $last_seen_from_source = $self->target->last_changeset_from_source( $self->source->uuid );
     my %import_args = (
         from  => $self->source,
         resdb => $self->resdb_handle,
+        resolver_class => $self->merge_resolver(),
         force => $self->has_arg('force'),
+        before_load_changeset_callback  => sub { 
+                my %args = (@_);
+                my ($seq, $orig_uuid, $orig_seq, $key) = @{$args{changeset_metadata}};
+                # skip changesets we've seen before
+                if (
+                $self->target->has_seen_changeset( source_uuid => $orig_uuid,
+                                           sequence_no => $orig_seq) ){
+                        return undef;
+                } else {
+                    return 1;
+                }
+
+            },
     );
 
     local $| = 1;
 
-    $import_args{resolver_class} = $self->merge_resolver();
 
     my $changesets = 0;
 
-    my $source_latest = $self->source->latest_sequence_no() || 0;
-    my $source_last_seen = $self->target->last_changeset_from_source( $self->source->uuid );
 
     if ( $self->has_arg('dry-run') ) {
 
         $self->source->traverse_changesets(
-            after    => $source_last_seen,
+            after    => $last_seen_from_source,
+            before_load_changeset_callback  => sub { 
+                my $data = shift;
+                my ($seq, $orig_uuid, $orig_seq, $key) = @$data;
+                # skip changesets we've seen before
+                if ( $self->has_seen_changeset( source_uuid => $orig_uuid, sequence_no => $orig_seq) ){
+                        return undef;
+                } else {
+                    return 1;
+                }
+
+            },
             callback => sub {
                 my %args = (@_);
                 if ( $self->target->should_accept_changeset( $args{changeset} ) ) {
@@ -107,7 +130,7 @@ sub _do_merge {
     } else {
 
         if ( $self->has_arg('verbose') ) {
-            print "Integrating changes from " . $source_last_seen . " to " . $source_latest . "\n";
+            print "Integrating changes from " . $last_seen_from_source . " to " . $source_latest . "\n";
             $import_args{reporting_callback} = sub {
                 my %args = @_;
                 print $args{changeset}->as_string;
@@ -115,11 +138,10 @@ sub _do_merge {
             };
         } else {
             $import_args{reporting_callback} = $self->progress_bar(
-                max    => ( $source_latest - $source_last_seen ),
+                max    => ( $source_latest - $last_seen_from_source ),
                 format => "%30b %p %E\r"
             );
         }
-
         $self->target->import_changesets(%import_args);
         return $changesets;
     }

-----------------------------------------------------------------------



More information about the Bps-public-commit mailing list