[Bps-public-commit] r15622 - in Prophet/trunk: . lib/Prophet lib/Prophet/CLI/Command lib/Prophet/Replica lib/Prophet/Resolver

jesse at bestpractical.com jesse at bestpractical.com
Thu Aug 28 21:16:20 EDT 2008


Author: jesse
Date: Thu Aug 28 21:16:18 2008
New Revision: 15622

Modified:
   Prophet/trunk/   (props changed)
   Prophet/trunk/lib/Prophet/CLI/Command/Merge.pm
   Prophet/trunk/lib/Prophet/Replica.pm
   Prophet/trunk/lib/Prophet/Replica/prophet.pm
   Prophet/trunk/lib/Prophet/Resolver/IdenticalChanges.pm
   Prophet/trunk/t/simple-push.t

Log:


Modified: Prophet/trunk/lib/Prophet/CLI/Command/Merge.pm
==============================================================================
--- Prophet/trunk/lib/Prophet/CLI/Command/Merge.pm	(original)
+++ Prophet/trunk/lib/Prophet/CLI/Command/Merge.pm	Thu Aug 28 21:16:18 2008
@@ -31,6 +31,14 @@
 
     my $changesets = $self->_do_merge( $source, $target );
 
+    $self->print_report($changesets);
+}
+
+
+sub print_report {
+    my $self = shift;
+    my $changesets = shift;
+    print "\n";
     if ($changesets == 0) {
         print "No new changesets.\n";
     }
@@ -62,6 +70,58 @@
 sub _do_merge {
     my ( $self, $source, $target ) = @_;
 
+    my %import_args = (
+        from  => $source,
+        resdb => $self->resdb_handle,
+        force => $self->has_arg('force'),
+    );
+
+    local $| = 1;
+
+    $self->validate_merge_replicas($source => $target);
+
+    $import_args{resolver_class} = $self->merge_resolver();
+
+    my $changesets = 0;
+
+    my $source_latest = $source->latest_sequence_no();
+    my $source_last_seen = $target->last_changeset_from_source($source->uuid);
+
+    if( $self->has_arg('verbose') ) {
+        print "Integrating changes from ".$source_last_seen . " to ". $source_latest."\n";
+    }
+
+
+    if( $self->has_arg('verbose') ) {
+        $import_args{reporting_callback} = sub {
+            my %args = @_;
+            print $args{changeset}->as_string;
+            $changesets++;
+        };
+    } else {
+        require Time::Progress;
+        my $progress = Time::Progress->new();
+        $progress->attr( max => ($source_latest - $source_last_seen));
+
+        $import_args{reporting_callback} = sub {
+            my %args = @_;
+            $changesets++;
+            print $progress->report( "%30b %p %E // ". ($args{changeset}->created || 'Undated'). " " .(sprintf("%-12s",$args{changeset}->creator||'')) ."\r" , $changesets);
+
+        };
+
+    }
+
+    $target->import_changesets( %import_args);
+    return $changesets;
+}
+
+
+sub validate_merge_replicas {
+    my $self = shift;
+    my $source = shift;
+    my $target = shift;
+
     if ( $target->uuid eq $source->uuid ) {
         $self->fatal_error(
                   "You appear to be trying to merge two identical replicas. "
@@ -74,43 +134,21 @@
                 . " does not accept changesets. Perhaps it's unwritable."
         );
     }
+}
 
-    my $prefer = $self->arg('prefer') || 'none';
-
-    my $resolver = $ENV{'PROPHET_RESOLVER'}
-                   ? 'Prophet::Resolver::' . $ENV{'PROPHET_RESOLVER'}
-                 : $prefer eq 'to'
-                   ? 'Prophet::Resolver::AlwaysTarget'
-                 : $prefer eq 'from'
-                   ? 'Prophet::Resolver::AlwaysSource'
-                   : ();
-
-    my %import_args = (
-        from  => $source,
-        resdb => $self->resdb_handle,
-        force => $self->has_arg('force'),
-    );
-
-    $import_args{resolver_class} = $resolver
-        if $resolver;
-
-    my $changesets = 0;
-    my $verbose = $self->has_arg('verbose');
-
-    $import_args{reporting_callback} = sub {
-        my %args = @_;
-        my $changeset = $args{changeset};
-        print $changeset->as_string if $verbose;
-        $changesets++;
-    };
+sub merge_resolver {
+    my $self = shift;
 
-    $target->import_changesets(
-        %import_args,
-    );
+    my $prefer = $self->arg('prefer') || 'none';
 
-    return $changesets;
+    my $resolver = $ENV{'PROPHET_RESOLVER'} ? 'Prophet::Resolver::' . $ENV{'PROPHET_RESOLVER'}
+        : $prefer eq 'to'   ? 'Prophet::Resolver::AlwaysTarget'
+        : $prefer eq 'from' ? 'Prophet::Resolver::AlwaysSource'
+        :                     ();
+    return $resolver;
 }
 
+
 __PACKAGE__->meta->make_immutable;
 no Moose;
 

Modified: Prophet/trunk/lib/Prophet/Replica.pm
==============================================================================
--- Prophet/trunk/lib/Prophet/Replica.pm	(original)
+++ Prophet/trunk/lib/Prophet/Replica.pm	Thu Aug 28 21:16:18 2008
@@ -317,9 +317,10 @@
 
     return;
 }
+
 =head3 last_changeset_from_source $SOURCE_UUID
 
-Returns the last changeset id seen from the source identified by $SOURCE_UUID.
+Returns the last changeset id seen from the replica identified by $SOURCE_UUID.
 
 =cut
 
@@ -455,55 +456,36 @@
         }
     );
 
-    if ( $self->db_uuid && $args{for}->db_uuid && $self->db_uuid ne $args{for}->db_uuid ) {
-        unless ($args{'force'}) {
-            die "You are trying to merge two different databases! This is NOT\n".
-            "recommended. If you really want to do this,  add '--force' to\n".
-            "your commandline.\n\n"
-            . "Local database:  " . $self->db_uuid      . "\n"
-            . "Remote database: " . $args{for}->db_uuid . "\n";
-        }
-    }
-
-
-    $self->log("Evaluating changesets to apply to ".substr($args{'for'}->uuid,0,6). " starting with ".  $args{for}->last_changeset_from_source( $self->uuid ));
+    $self->_check_db_uuids_on_merge(for => $args{for}, force => $args{'force'});
 
-
-    my $callback = $args{callback};
     $self->traverse_changesets(
         after    => $args{for}->last_changeset_from_source( $self->uuid ),
-        callback => sub {
-            $callback->( $_[0] )
-                if $self->should_send_changeset( changeset => $_[0], to => $args{for} );
+        callback => sub { $args{callback}->( $_[0] ) if $self->should_send_changeset( changeset => $_[0], to        => $args{for});
         }
     );
 }
 
-=head2 new_changesets_for Prophet::Replica
-
-DEPRECATED: use traverse_new_changesets instead
-
-Returns the local changesets that have not yet been seen by the replica we're passing in.
-
-=cut
-
-
-sub new_changesets_for {
+sub _check_db_uuids_on_merge {
     my $self = shift;
-
-    # the first argument is always the replica
-    unshift @_, 'replica';
-    my %args = validate(@_, {
-        replica  => { isa => 'Prophet::Replica' },
-        force    => 0,
-    });
-
-    my @result;
-    $self->traverse_new_changesets( for => $args{replica}, callback => sub { push @result, $_[0] }, force => $args{force} );
-
-    return \@result;
+    my %args = validate( @_,
+        {   for   => { isa => 'Prophet::Replica' },
+            force => 0,
+        });
+    if (   $self->db_uuid && $args{for}->db_uuid
+        && $self->db_uuid ne $args{for}->db_uuid ) {
+        unless ( $args{'force'} ) {
+            die "You are trying to merge two different databases! This is NOT\n"
+                . "recommended. If you really want to do this,  add '--force' to\n"
+                . "your commandline.\n\n"
+                . "Local database:  "
+                . $self->db_uuid . "\n"
+                . "Remote database: "
+                . $args{for}->db_uuid . "\n";
+        }
+    }
 }
 
+
 =head3 should_send_changeset { to => L<Prophet::Replica>, changeset => L<Prophet::ChangeSet> }
 
 Returns true if the replica C<to> hasn't yet seen the changeset C<changeset>.
@@ -527,7 +509,7 @@
 
 =head3 fetch_changesets { after => SEQUENCE_NO }
 
-Fetch all changesets from the source.
+Fetch all changesets from this replica after the local sequence number SEQUENCE_NO.
 
 Returns a reference to an array of L<Prophet::ChangeSet/> objects.
 

Modified: Prophet/trunk/lib/Prophet/Replica/prophet.pm
==============================================================================
--- Prophet/trunk/lib/Prophet/Replica/prophet.pm	(original)
+++ Prophet/trunk/lib/Prophet/Replica/prophet.pm	Thu Aug 28 21:16:18 2008
@@ -434,8 +434,7 @@
     # XXX TODO: skip if the index already has this version of the record;
     # XXX TODO FETCH THAT
     my $record_last_changed_changeset = $args{'changeset_id'} || 0;
-    my $index_row
-        = pack( 'NH40', $record_last_changed_changeset, $args{cas_key} );
+    my $index_row = pack( 'NH40', $record_last_changed_changeset, $args{cas_key} );
     print $record_index $index_row || die $!;
     close $record_index;
 }

Modified: Prophet/trunk/lib/Prophet/Resolver/IdenticalChanges.pm
==============================================================================
--- Prophet/trunk/lib/Prophet/Resolver/IdenticalChanges.pm	(original)
+++ Prophet/trunk/lib/Prophet/Resolver/IdenticalChanges.pm	Thu Aug 28 21:16:18 2008
@@ -48,7 +48,9 @@
         next if ((!defined $prop_change->target_value || $prop_change->target_value  eq '')
                 
                 && ( !defined $prop_change->source_new_value || $prop_change->source_new_value eq ''));
-        next if $prop_change->target_value eq $prop_change->source_new_value;
+        next if (defined  $prop_change->target_value 
+        and defined $prop_change->source_new_value
+            and ( $prop_change->target_value eq $prop_change->source_new_value));
         return 0; 
     }
 

Modified: Prophet/trunk/t/simple-push.t
==============================================================================
--- Prophet/trunk/t/simple-push.t	(original)
+++ Prophet/trunk/t/simple-push.t	Thu Aug 28 21:16:18 2008
@@ -45,7 +45,18 @@
 
 };
 
-my $changesets =   [ map { $_->as_hash } grep { $_->has_changes }  @{$bob->new_changesets_for($alice, force => 1)}];
+my $changesets;
+    $bob->traverse_new_changesets( for => $alice, force => 1,
+            callback => sub {
+                my $cs = shift;
+                return unless $cs->has_changes,
+                push @{$changesets}, $cs->as_hash;
+            }
+        
+        
+    );
+
+
 my $seq = delete $changesets->[0]->{'sequence_no'};
 my $orig_seq = delete $changesets->[0]->{'original_sequence_no'};
 is($seq, $orig_seq);



More information about the Bps-public-commit mailing list