[Bps-public-commit] r11622 - in SVN-PropDB: . lib/Prophet
clkao at bestpractical.com
clkao at bestpractical.com
Mon Apr 7 10:01:46 EDT 2008
Author: clkao
Date: Mon Apr 7 10:01:46 2008
New Revision: 11622
Modified:
SVN-PropDB/ (props changed)
SVN-PropDB/lib/Prophet/Replica.pm
SVN-PropDB/lib/Prophet/Replica/HTTP.pm
SVN-PropDB/lib/Prophet/Replica/Hiveminder.pm
SVN-PropDB/lib/Prophet/Replica/RT.pm
Log:
merge streamy replica changes.
r28799 at mtl: clkao | 2008-04-07 12:14:21 +0800
local branch
r28800 at mtl: clkao | 2008-04-07 12:15:06 +0800
In base class for replica, provide traverse_new_changesets
method for streamy traversal.
r28801 at mtl: clkao | 2008-04-07 12:23:21 +0800
streamy traverse_changesets.
r28815 at mtl: clkao | 2008-04-07 15:54:22 +0800
make traverse_new_changesets streamy and deprecates
new_changesets_for.
r28816 at mtl: clkao | 2008-04-07 16:03:46 +0800
port rt and hm replica to use streamy interface.
r28817 at mtl: clkao | 2008-04-07 21:55:24 +0800
- make http source streamy
- warn about replica implementation on streamy subclasses
Modified: SVN-PropDB/lib/Prophet/Replica.pm
==============================================================================
--- SVN-PropDB/lib/Prophet/Replica.pm (original)
+++ SVN-PropDB/lib/Prophet/Replica.pm Mon Apr 7 10:01:46 2008
@@ -77,19 +77,18 @@
my $source = $args{'from'};
- my $changesets_to_integrate = $source->new_changesets_for($self);
-
- for my $changeset (@$changesets_to_integrate) {
- $self->integrate_changeset(
- changeset => $changeset,
- conflict_callback => $args{conflict_callback},
- reporting_callback => $args{'reporting_callback'},
- resolver => $args{resolver},
- resolver_class => $args{'resolver_class'},
- resdb => $args{'resdb'},
- );
-
- }
+ $source->traverse_new_changesets
+ ( for => $self,
+ callback => sub {
+ $self->integrate_changeset(
+ changeset => $_[0],
+ conflict_callback => $args{conflict_callback},
+ reporting_callback => $args{'reporting_callback'},
+ resolver => $args{resolver},
+ resolver_class => $args{'resolver_class'},
+ resdb => $args{'resdb'},
+ );
+ } );
}
sub import_resolutions_from_remote_replica {
@@ -337,8 +336,39 @@
$changeset->changes ];
}
+=head2 traverse_new_changesets ( for => $replica, callback => sub { my $changeset = shift; ... } )
+
+Traverse the new changesets for C<$replica> and call C<callback> for each new changesets.
+
+XXX: this also provide hinting callbacks for the caller to know in
+advance how many changesets are there for traversal.
+
+=cut
+
+sub traverse_new_changesets {
+ my $self = shift;
+ my %args = validate(
+ @_, { for => { isa => 'Prophet::Replica' },
+ callback => 1,
+ } );
+
+ if ($self->db_uuid && $args{for}->db_uuid && $self->db_uuid ne $args{for}->db_uuid) {
+ #warn "HEY. You should not be merging between two replicas with different database uuids";
+ # XXX TODO
+ }
+
+
+ $self->traverse_changesets( after => $args{for}->last_changeset_from_source( $self->uuid ),
+ callback => sub {
+ $args{callback}->($_[0])
+ if $self->should_send_changeset( changeset => $_[0], to => $args{for} );
+ } );
+}
+
=head2 news_changesets_for Prophet::Replica
+DEPRECATED: use traverse_new_changesets instead
+
Returns the local changesets that have not yet been seen by the replica we're passing in.
=cut
@@ -352,15 +382,12 @@
sub new_changesets_for {
my $self = shift;
- my ($other) = validate_pos( @_, { isa => 'Prophet::Replica' } );
- if ( $self->db_uuid && $other->db_uuid && $self->db_uuid ne $other->db_uuid ) {
+ my ( $other ) = validate_pos(@_, { isa => 'Prophet::Replica'});
- #warn "HEY. You should not be merging between two replicas with different database uuids";
- # XXX TODO
- }
+ my @result;
+ $self->traverse_new_changesets( for => $other, callback => sub { push @result, $_[0] } );
- return [ grep { $self->should_send_changeset( changeset => $_, to => $other ) }
- @{ $self->fetch_changesets( after => $other->last_changeset_from_source( $self->uuid ) ) } ];
+ return \@result;
}
=head2 should_send_changeset { to => Prophet::Replica, changeset => Prophet::ChangeSet }
@@ -385,25 +412,36 @@
Fetch all changesets from the source.
Returns a reference to an array of L<Prophet::ChangeSet/> objects.
+
+See also L<traverse_new_changesets> for replica implementations to provide streamly interface
=cut
-# XXX: this totally wants to get streamy and use a callback so we can integrate while fetching.
sub fetch_changesets {
my $self = shift;
my %args = validate( @_, { after => 1 } );
my @results;
+ $self->traverse_changesets( %args, callback => sub { push @results, $_[0] } );
+
+ return \@results;
+}
+
+sub traverse_changesets {
+ my $self = shift;
+ my %args = validate(
+ @_, { after => 1,
+ callback => 1,
+ } );
+
my $first_rev = ( $args{'after'} + 1 ) || 1;
+ die "you must implement most_recent_changeset in ".ref($self).", or override traverse_changesets"
+ unless $self->can('most_recent_changeset');
- # XXX TODO we should be using a svn get_log call here rather than simple iteration
- # clkao explains that this won't deal cleanly with cases where there are revision "holes"
for my $rev ( $first_rev .. $self->most_recent_changeset ) {
- push @results, $self->fetch_changeset($rev);
+ $args{callback}->( $self->fetch_changeset($rev) );
}
-
- return \@results;
}
use Path::Class;
Modified: SVN-PropDB/lib/Prophet/Replica/HTTP.pm
==============================================================================
--- SVN-PropDB/lib/Prophet/Replica/HTTP.pm (original)
+++ SVN-PropDB/lib/Prophet/Replica/HTTP.pm Mon Apr 7 10:01:46 2008
@@ -60,10 +60,12 @@
use constant CHG_RECORD_SIZE => ( 4 + 16 + 4 +20 );
-sub fetch_changesets {
+sub traverse_changesets {
my $self = shift;
- my %args = validate( @_, { after => 1 } );
- my @results;
+ my %args = validate(
+ @_, { after => 1,
+ callback => 1,
+ } );
my $first_rev = ( $args{'after'} + 1 ) || 1;
@@ -86,11 +88,10 @@
$changeset->sequence_no($seq);
$changeset->original_source_uuid( $orig_uuid);
$changeset->original_sequence_no( $orig_seq);
- push @results, $changeset;
+ $args{callback}->($changeset);
}
-
- return \@results;
}
+
sub record_integration_changeset {
die 'readonly';
}
Modified: SVN-PropDB/lib/Prophet/Replica/Hiveminder.pm
==============================================================================
--- SVN-PropDB/lib/Prophet/Replica/Hiveminder.pm (original)
+++ SVN-PropDB/lib/Prophet/Replica/Hiveminder.pm Mon Apr 7 10:01:46 2008
@@ -72,32 +72,22 @@
}
-
-=head2 fetch_changesets { after => SEQUENCE_NO }
-
-Fetch all changesets from the source.
-
-Returns a reference to an array of L<Prophet::ChangeSet/> objects.
-
-
-=cut
-
-sub fetch_changesets {
+sub traverse_changesets {
my $self = shift;
- my %args = validate( @_, { after => 1 } );
+ my %args = validate(
+ @_, { after => 1,
+ callback => 1,
+ } );
my $first_rev = ( $args{'after'} + 1 ) || 1;
- my @changesets;
- my %tix;
my $recoder = Prophet::Replica::Hiveminder::PullEncoder->new( { sync_source => $self } );
- for my $task ( @{$self->find_matching_tasks} ) {
- push @changesets, @{ $recoder->run(
+ for my $task ( @{ $self->find_matching_tasks } ) {
+ $args{callback}->($_)
+ for @{ $recoder->run(
task => $task,
- transactions => $self->find_matching_transactions( task => $task->{id}, starting_transaction => $first_rev )) };
+ transactions => $self->find_matching_transactions( task => $task->{id}, starting_transaction => $first_rev ) ) };
}
-
- return [ sort { $a->original_sequence_no <=> $b->original_sequence_no } @changesets ];
}
sub find_matching_tasks {
Modified: SVN-PropDB/lib/Prophet/Replica/RT.pm
==============================================================================
--- SVN-PropDB/lib/Prophet/Replica/RT.pm (original)
+++ SVN-PropDB/lib/Prophet/Replica/RT.pm Mon Apr 7 10:01:46 2008
@@ -315,39 +315,25 @@
}
-
-
-=head2 fetch_changesets { after => SEQUENCE_NO }
-
-Fetch all changesets from the source.
-
-Returns a reference to an array of L<Prophet::ChangeSet/> objects.
-
-
-=cut
-
-sub fetch_changesets {
+sub traverse_changesets {
my $self = shift;
- my %args = validate( @_, { after => 1 } );
+ my %args = validate(
+ @_, { after => 1,
+ callback => 1,
+ } );
my $first_rev = ( $args{'after'} + 1 ) || 1;
- my @changesets;
- my %tix;
my $recoder = Prophet::Replica::RT::PullEncoder->new( { sync_source => $self } );
for my $id ( $self->find_matching_tickets ) {
# XXX: _recode_transactions should ignore txn-id <= $first_rev
- push @changesets,
- @{
- $recoder->run(
+ $args{callback}->($_)
+ for @{ $recoder->run(
ticket => $self->rt->show( type => 'ticket', id => $id ),
transactions => $self->find_matching_transactions( ticket => $id, starting_transaction => $first_rev )
- )
- };
+ ) };
}
-
- return [ sort { $a->original_sequence_no <=> $b->original_sequence_no } @changesets ];
}
sub find_matching_tickets {
More information about the Bps-public-commit
mailing list