[Bps-public-commit] r11622 - in SVN-PropDB: . lib/Prophet

clkao at bestpractical.com clkao at bestpractical.com
Mon Apr 7 10:01:46 EDT 2008


Author: clkao
Date: Mon Apr  7 10:01:46 2008
New Revision: 11622

Modified:
   SVN-PropDB/   (props changed)
   SVN-PropDB/lib/Prophet/Replica.pm
   SVN-PropDB/lib/Prophet/Replica/HTTP.pm
   SVN-PropDB/lib/Prophet/Replica/Hiveminder.pm
   SVN-PropDB/lib/Prophet/Replica/RT.pm

Log:
merge streamy replica changes.

 r28799 at mtl:  clkao | 2008-04-07 12:14:21 +0800
 local branch
 r28800 at mtl:  clkao | 2008-04-07 12:15:06 +0800
 In base class for replica, provide traverse_new_changesets
 method for streamy traversal.
 
 r28801 at mtl:  clkao | 2008-04-07 12:23:21 +0800
 streamy traverse_changesets.
 r28815 at mtl:  clkao | 2008-04-07 15:54:22 +0800
 make traverse_new_changesets streamy and deprecates
 new_changesets_for.
 
 r28816 at mtl:  clkao | 2008-04-07 16:03:46 +0800
 port rt and hm replica to use streamy interface.
 
 r28817 at mtl:  clkao | 2008-04-07 21:55:24 +0800
 - make http source streamy
 - warn about replica implementation on streamy subclasses
 


Modified: SVN-PropDB/lib/Prophet/Replica.pm
==============================================================================
--- SVN-PropDB/lib/Prophet/Replica.pm	(original)
+++ SVN-PropDB/lib/Prophet/Replica.pm	Mon Apr  7 10:01:46 2008
@@ -77,19 +77,18 @@
 
     my $source = $args{'from'};
 
-    my $changesets_to_integrate = $source->new_changesets_for($self);
-
-    for my $changeset (@$changesets_to_integrate) {
-        $self->integrate_changeset(
-            changeset          => $changeset,
-            conflict_callback  => $args{conflict_callback},
-            reporting_callback => $args{'reporting_callback'},
-            resolver           => $args{resolver},
-            resolver_class     => $args{'resolver_class'},
-            resdb              => $args{'resdb'},
-        );
-
-    }
+    $source->traverse_new_changesets
+        ( for => $self,
+          callback => sub {
+            $self->integrate_changeset(
+                changeset          => $_[0],
+                conflict_callback  => $args{conflict_callback},
+                reporting_callback => $args{'reporting_callback'},
+                resolver           => $args{resolver},
+                resolver_class     => $args{'resolver_class'},
+                resdb              => $args{'resdb'},
+            );
+        } );
 }
 
 sub import_resolutions_from_remote_replica {
@@ -337,8 +336,39 @@
             $changeset->changes ];
 }
 
+=head2 traverse_new_changesets ( for => $replica, callback => sub { my $changeset = shift; ... } )
+
+Traverse the new changesets for C<$replica> and call C<callback> for each new changesets.
+
+XXX: this also provide hinting callbacks for the caller to know in
+advance how many changesets are there for traversal.
+
+=cut
+
+sub traverse_new_changesets {
+    my $self = shift;
+    my %args = validate(
+        @_, { for => { isa => 'Prophet::Replica' },
+              callback => 1,
+        } );
+
+    if ($self->db_uuid && $args{for}->db_uuid && $self->db_uuid ne $args{for}->db_uuid) {
+        #warn "HEY. You should not be merging between two replicas with different database uuids";
+        # XXX TODO
+    }
+
+
+    $self->traverse_changesets( after => $args{for}->last_changeset_from_source( $self->uuid ),
+                                callback => sub {
+                                    $args{callback}->($_[0])
+                                        if $self->should_send_changeset( changeset => $_[0], to => $args{for} );
+                                } );
+}
+
 =head2 news_changesets_for Prophet::Replica
 
+DEPRECATED: use traverse_new_changesets instead
+
 Returns the local changesets that have not yet been seen by the replica we're passing in.
 
 =cut
@@ -352,15 +382,12 @@
 
 sub new_changesets_for {
     my $self = shift;
-    my ($other) = validate_pos( @_, { isa => 'Prophet::Replica' } );
-    if ( $self->db_uuid && $other->db_uuid && $self->db_uuid ne $other->db_uuid ) {
+    my (  $other ) = validate_pos(@_, { isa => 'Prophet::Replica'});
 
-        #warn "HEY. You should not be merging between two replicas with different database uuids";
-        # XXX TODO
-    }
+    my @result;
+    $self->traverse_new_changesets( for => $other, callback => sub { push @result, $_[0] } );
 
-    return [ grep { $self->should_send_changeset( changeset => $_, to => $other ) }
-            @{ $self->fetch_changesets( after => $other->last_changeset_from_source( $self->uuid ) ) } ];
+    return \@result;
 }
 
 =head2 should_send_changeset { to => Prophet::Replica, changeset => Prophet::ChangeSet }
@@ -385,25 +412,36 @@
 Fetch all changesets from the source. 
         
 Returns a reference to an array of L<Prophet::ChangeSet/> objects.
+
+See also L<traverse_new_changesets> for replica implementations to provide streamly interface
         
 
 =cut    
 
-# XXX: this totally wants to get streamy and use a callback so we can integrate while fetching.
 sub fetch_changesets {
     my $self = shift;
     my %args = validate( @_, { after => 1 } );
     my @results;
 
+    $self->traverse_changesets( %args, callback => sub { push @results, $_[0] } );
+
+    return \@results;
+}
+
+sub traverse_changesets {
+    my $self = shift;
+    my %args = validate(
+        @_, { after => 1,
+              callback => 1,
+        } );
+
     my $first_rev = ( $args{'after'} + 1 ) || 1;
+    die "you must implement most_recent_changeset in ".ref($self).", or override traverse_changesets"
+        unless $self->can('most_recent_changeset');
 
-    # XXX TODO we should  be using a svn get_log call here rather than simple iteration
-    # clkao explains that this won't deal cleanly with cases where there are revision "holes"
     for my $rev ( $first_rev .. $self->most_recent_changeset ) {
-        push @results, $self->fetch_changeset($rev);
+        $args{callback}->( $self->fetch_changeset($rev) );
     }
-
-    return \@results;
 }
 
 use Path::Class;

Modified: SVN-PropDB/lib/Prophet/Replica/HTTP.pm
==============================================================================
--- SVN-PropDB/lib/Prophet/Replica/HTTP.pm	(original)
+++ SVN-PropDB/lib/Prophet/Replica/HTTP.pm	Mon Apr  7 10:01:46 2008
@@ -60,10 +60,12 @@
 
 use constant CHG_RECORD_SIZE =>  ( 4 + 16 + 4 +20 );
 
-sub fetch_changesets {
+sub traverse_changesets {
     my $self = shift;
-    my %args = validate( @_, { after => 1 } );
-    my @results;
+    my %args = validate(
+        @_, { after => 1,
+              callback => 1,
+        } );
 
     my $first_rev = ( $args{'after'} + 1 ) || 1;
 
@@ -86,11 +88,10 @@
         $changeset->sequence_no($seq);
         $changeset->original_source_uuid( $orig_uuid);
         $changeset->original_sequence_no( $orig_seq);
-        push @results, $changeset;
+        $args{callback}->($changeset);
     }
-
-    return \@results;
 }
+
 sub record_integration_changeset {
     die 'readonly';
 }

Modified: SVN-PropDB/lib/Prophet/Replica/Hiveminder.pm
==============================================================================
--- SVN-PropDB/lib/Prophet/Replica/Hiveminder.pm	(original)
+++ SVN-PropDB/lib/Prophet/Replica/Hiveminder.pm	Mon Apr  7 10:01:46 2008
@@ -72,32 +72,22 @@
 }
 
 
-
-=head2 fetch_changesets { after => SEQUENCE_NO } 
-
-Fetch all changesets from the source. 
-
-Returns a reference to an array of L<Prophet::ChangeSet/> objects.
-
-
-=cut
-
-sub fetch_changesets {
+sub traverse_changesets {
     my $self = shift;
-    my %args = validate( @_, { after => 1 } );
+    my %args = validate(
+        @_, { after => 1,
+              callback => 1,
+        } );
 
     my $first_rev = ( $args{'after'} + 1 ) || 1;
 
-    my @changesets;
-    my %tix;
     my $recoder = Prophet::Replica::Hiveminder::PullEncoder->new( { sync_source => $self } );
-    for my $task ( @{$self->find_matching_tasks} ) {
-        push @changesets, @{ $recoder->run(
+    for my $task ( @{ $self->find_matching_tasks } ) {
+        $args{callback}->($_)
+            for @{ $recoder->run(
                 task => $task,
-                transactions => $self->find_matching_transactions( task => $task->{id}, starting_transaction => $first_rev )) };
+                transactions => $self->find_matching_transactions( task => $task->{id}, starting_transaction => $first_rev ) ) };
     }
-
-    return [ sort { $a->original_sequence_no <=> $b->original_sequence_no } @changesets ];
 }
 
 sub find_matching_tasks {

Modified: SVN-PropDB/lib/Prophet/Replica/RT.pm
==============================================================================
--- SVN-PropDB/lib/Prophet/Replica/RT.pm	(original)
+++ SVN-PropDB/lib/Prophet/Replica/RT.pm	Mon Apr  7 10:01:46 2008
@@ -315,39 +315,25 @@
 
 }
 
-
-
-=head2 fetch_changesets { after => SEQUENCE_NO } 
-
-Fetch all changesets from the source. 
-
-Returns a reference to an array of L<Prophet::ChangeSet/> objects.
-
-
-=cut
-
-sub fetch_changesets {
+sub traverse_changesets {
     my $self = shift;
-    my %args = validate( @_, { after => 1 } );
+    my %args = validate(
+        @_, { after => 1,
+              callback => 1,
+        } );
 
     my $first_rev = ( $args{'after'} + 1 ) || 1;
 
-    my @changesets;
-    my %tix;
     my $recoder = Prophet::Replica::RT::PullEncoder->new( { sync_source => $self } );
     for my $id ( $self->find_matching_tickets ) {
 
         # XXX: _recode_transactions should ignore txn-id <= $first_rev
-        push @changesets,
-            @{
-            $recoder->run(
+        $args{callback}->($_)
+            for @{ $recoder->run(
                 ticket => $self->rt->show( type => 'ticket', id => $id ),
                 transactions => $self->find_matching_transactions( ticket => $id, starting_transaction => $first_rev )
-            )
-            };
+                ) };
     }
-
-    return [ sort { $a->original_sequence_no <=> $b->original_sequence_no } @changesets ];
 }
 
 sub find_matching_tickets {



More information about the Bps-public-commit mailing list