[Bps-public-commit] Prophet - A disconnected, replicated p2p database branch, master, updated. 30be47aae2fb7f28a26246059363568269ccb799
jesse
jesse at bestpractical.com
Mon Jan 26 12:54:26 EST 2009
The branch, master has been updated
via 30be47aae2fb7f28a26246059363568269ccb799 (commit)
via 4811a0eaf7346f123ba83935d886d0f242b5455b (commit)
from 6ea7cc2620d50f68f2523f758e6bf8d1af892126 (commit)
Summary of changes:
lib/Prophet/ForeignReplica.pm | 36 ++++++++++-------
lib/Prophet/Replica.pm | 80 ++++++++++++----------------------------
lib/Prophet/Replica/prophet.pm | 4 +-
lib/Prophet/Replica/sqlite.pm | 3 +-
t/simple-push.t | 17 +++++---
5 files changed, 57 insertions(+), 83 deletions(-)
- Log -----------------------------------------------------------------
commit 4811a0eaf7346f123ba83935d886d0f242b5455b
Author: Jesse Vincent <jesse at bestpractical.com>
Date: Mon Jan 26 12:51:08 2009 -0500
Improve simple_push tests to cope with upcoming changes to how we skip changesets on pull
diff --git a/t/simple-push.t b/t/simple-push.t
index abf3b94..72b891a 100644
--- a/t/simple-push.t
+++ b/t/simple-push.t
@@ -68,6 +68,7 @@ as_bob {
$alice->uuid
)
);
+ diag("As alice, my latest sequence # is " .$alice->latest_sequence_no);
is( $bob->last_changeset_from_source( $alice->uuid ) =>
$alice->latest_sequence_no );
@@ -79,17 +80,16 @@ $bob->traverse_new_changesets(
force => 1,
callback => sub {
my $cs = shift;
- return unless $cs->has_changes, push @{$changesets}, $cs->as_hash;
+ return unless $bob->should_send_changeset( changeset => $cs, to => $alice);
+ return unless $cs->has_changes;
+ push @{$changesets}, $cs->as_hash;
}
);
my $seq = delete $changesets->[0]->{'sequence_no'};
my $orig_seq = delete $changesets->[0]->{'original_sequence_no'};
is( $seq, $orig_seq );
-
-is_deeply(
- $changesets,
- [ { #'sequence_no' => 3,
+my $cs_data = [ { #'sequence_no' => 3,
#'original_sequence_no' => 3, # the number is different on different replica types
'creator' => 'bob',
'created' => $changesets->[0]->{created},
@@ -122,8 +122,11 @@ is_deeply(
},
'is_nullification' => undef,
}
- ]
-);
+ ];
+
+
+is_deeply( $changesets,$cs_data);
+
# Alice syncs
commit 30be47aae2fb7f28a26246059363568269ccb799
Author: Jesse Vincent <jesse at bestpractical.com>
Date: Mon Jan 26 12:52:41 2009 -0500
Get rid of state databases in favor of storing local metadata in a bag on the side
diff --git a/lib/Prophet/ForeignReplica.pm b/lib/Prophet/ForeignReplica.pm
index 552c4bc..d1b43d9 100644
--- a/lib/Prophet/ForeignReplica.pm
+++ b/lib/Prophet/ForeignReplica.pm
@@ -12,21 +12,19 @@ This abstract baseclass implements the helpers you need to be able to easily syn
=cut
-sub BUILD {
- my $self = shift;
- my $state_handle_url = $self->app_handle->handle->url;
- $self->log( "Connecting to state database ".$state_handle_url);
- $self->state_handle(
- Prophet::Replica->get_handle(
- url => $state_handle_url,
- db_uuid => $self->state_db_uuid,
- app_handle => $self->app_handle
-
- ));
-}
+sub fetch_local_metadata { my $self = shift;
+ my $key = shift;
+ $self->app_handle->handle->fetch_local_metadata( $self->uuid . "-".$key )
+
+ }
+sub store_local_metadata { my $self = shift;
+ my $key = shift;
+ my $value = shift;
+ $self->app_handle->handle->store_local_metadata( $self->uuid."-".$key => $value);
+
+
+ }
-sub fetch_local_metadata { shift->state_handle->fetch_local_metadata(@_)}
-sub store_local_metadata { shift->state_handle->store_local_metadata(@_)}
@@ -107,7 +105,15 @@ sub has_seen_changeset {
# Has our host replica given this changeset to us yet?
# XXX TODO - should actually be checking the changeset id and the record id in a lookup table
# of all the changesets that may have come from the source
- return $self->last_changeset_from_source($changeset->source_uuid) >= $changeset->sequence_no;
+ #
+ if ($changeset->original_source_uuid eq $self->uuid) { return 1}
+
+ if ($self->last_changeset_from_source($changeset->original_source_uuid) >= $changeset->original_sequence_no) {
+ # XXX TODO - don't need this, right? || $self->last_changeset_from_source($changeset->source_uuid) >= $changeset->sequence_no ) {
+ return 1;
+ } else {
+ return 0;
+ }
}
sub log {
diff --git a/lib/Prophet/Replica.pm b/lib/Prophet/Replica.pm
index fd6ab52..ef18b77 100644
--- a/lib/Prophet/Replica.pm
+++ b/lib/Prophet/Replica.pm
@@ -7,11 +7,6 @@ use constant state_db_uuid => 'state';
use Prophet::App;
-has state_handle => (
- is => 'rw',
- isa => 'Prophet::Replica',
-);
-
has metadata_store => (
is => 'rw',
isa => 'Prophet::MetadataStore',
@@ -31,12 +26,6 @@ has is_resdb => (
documentation => 'Whether this replica is a resolution db or not.'
);
-has is_state_handle => (
- is => 'rw',
- isa => 'Bool',
- documentation => 'Whether this replica is a state handle or not.',
-);
-
has db_uuid => (
is => 'rw',
isa => 'Str',
@@ -174,8 +163,6 @@ sub import_changesets {
my $source = $args{'from'};
- # they have no changes, that means they're probably creating a new replica
- # of a database, so copy the db_uuid
warn "The source does not exist" unless ($source->replica_exists);
$source->traverse_new_changesets(
@@ -260,6 +247,7 @@ sub integrate_changeset {
my $changeset = $args{'changeset'};
+
$self->log_debug("Considering changeset ".$changeset->original_sequence_no .
" from " . $self->display_name_for_uuid($changeset->original_source_uuid));
@@ -274,13 +262,18 @@ sub integrate_changeset {
# - merge tickets for the same
# we'll want to skip or remove those changesets
- return if $changeset->original_source_uuid eq $self->uuid;
- return if ($changeset->is_nullification);
- $self->remove_redundant_data($changeset); #Things we have already seen
- return unless $changeset->has_changes;
+ if ( $changeset->is_nullification || !$changeset->has_changes ) {
+ # if it's a changeset we don't care about, mark it as seen and move on
+ $self->record_integration_of_changeset($changeset);
+ return;
+
+ } elsif ( $self->has_seen_changeset($changeset) ) {
+ $self->record_integration_of_changeset($changeset);
+ return;
+ }
- if ( my $conflict = $self->conflicts_from_changeset($changeset) ) {
+ elsif ( my $conflict = $self->conflicts_from_changeset($changeset) ) {
$self->log_debug("Integrating conflicting changeset ".$changeset->original_sequence_no . " from " . $self->display_name_for_uuid($changeset->original_source_uuid));
$args{conflict_callback}->($conflict) if $args{'conflict_callback'};
$conflict->resolvers( [ sub { $args{resolver}->(@_) } ] ) if $args{resolver};
@@ -311,12 +304,13 @@ sub integrate_changeset {
$args{'reporting_callback'}->( changeset => $changeset,
conflict => $conflict ) if ( $args{'reporting_callback'} );
-
+ return 1;
} else {
$self->log_debug("Integrating changeset ".$changeset->original_sequence_no .
" from " . $self->display_name_for_uuid($changeset->original_source_uuid));
$self->record_changeset_and_integration($changeset);
$args{'reporting_callback'}->( changeset => $changeset ) if ( $args{'reporting_callback'} );
+ return 1;
}
}
@@ -436,37 +430,6 @@ sub conflicts_from_changeset {
return $conflict;
}
-=head3 remove_redundant_data L<Prophet::Changeset>
-
-Prunes unnecessary data from this changeset as a sanity check (merge tickets
-from foreign replicas, resolution records in non-resolution databases, changes
-we've already seen).
-
-=cut
-
-sub remove_redundant_data {
- my ( $self, $changeset ) = @_;
-
- my @new_changes;
- for my $change ($changeset->changes) {
- # when would we run into resolution records in a nonresdb? XXX
- next if ($change->record_type eq '_prophet_resolution' && !$self->is_resdb);
-
- # never integrate a merge ticket that comes from a foreign database.
- # implict merge tickets are the devil and are lies. Merge tickets are
- # always generated locally by importing a change that originated on
- # that replica.
- # (The actual annoying technical problem is that the
- # locally created merge ticket is written out in a separate
- # transaction at ~ at the same time as the original imported one is
- # being written. This makes svn go boom.)
- next if ( $change->record_type eq $MERGETICKET_METATYPE); # && $change->record_uuid eq $self->uuid );
- push (@new_changes, $change);
- }
-
- $changeset->changes(\@new_changes);
-}
-
=head3 traverse_new_changesets ( for => $replica, callback => sub { my $changeset = shift; ... } )
Traverse the new changesets for C<$replica> and call C<callback> for each new
@@ -492,9 +455,7 @@ sub traverse_new_changesets {
$self->traverse_changesets(
after => $args{for}->last_changeset_from_source( $self->uuid ),
callback => sub {
- $args{callback}->( $_[0] ) if $self->should_send_changeset(
- changeset => $_[0], to => $args{for}
- );
+ $args{callback}->( $_[0] ) if $self->should_send_changeset( changeset => $_[0], to => $args{for});
}
);
}
@@ -936,10 +897,17 @@ sub record_integration_of_changeset {
my $self = shift;
my ($changeset) = validate_pos( @_, { isa => 'Prophet::ChangeSet' } );
- # Record a merge ticket for the changeset's "original" source
- return $self->store_local_metadata('last-changeset-from-'. $changeset->original_source_uuid => $changeset->original_sequence_no);
-
+ if ( $changeset->original_source_uuid ne $self->uuid &&
+ ( $self->last_changeset_from_source( $changeset->original_source_uuid ) < $changeset->original_sequence_no ) )
+ {
+ return $self->store_local_metadata( 'last-changeset-from-' . $changeset->original_source_uuid => $changeset->original_sequence_no );
+ }
+ if ( $self->last_changeset_from_source( $changeset->source_uuid ) < $changeset->sequence_no ) {
+ return $self->store_local_metadata( 'last-changeset-from-' . $changeset->source_uuid => $changeset->sequence_no );
+ }
}
+
+
=head2 routines which need to be implemented by any Prophet backend store
=head3 uuid
diff --git a/lib/Prophet/Replica/prophet.pm b/lib/Prophet/Replica/prophet.pm
index dc8f864..9c0f6c7 100644
--- a/lib/Prophet/Replica/prophet.pm
+++ b/lib/Prophet/Replica/prophet.pm
@@ -62,7 +62,7 @@ has '+resolution_db_handle' => (
lazy => 1,
default => sub {
my $self = shift;
- return if $self->is_resdb || $self->is_state_handle;
+ return if $self->is_resdb ;
return Prophet::Replica->get_handle(
{ url => "prophet:" . $self->url . '/resolutions',
app_handle => $self->app_handle,
@@ -220,8 +220,6 @@ sub BUILD {
}
-sub state_handle { return shift; }
-
=head2 replica_exists
Returns true if the replica already exists / has been initialized.
diff --git a/lib/Prophet/Replica/sqlite.pm b/lib/Prophet/Replica/sqlite.pm
index 5d65d05..cff2d35 100644
--- a/lib/Prophet/Replica/sqlite.pm
+++ b/lib/Prophet/Replica/sqlite.pm
@@ -69,7 +69,7 @@ has '+resolution_db_handle' => (
lazy => 1,
default => sub {
my $self = shift;
- return if $self->is_resdb || $self->is_state_handle;
+ return if $self->is_resdb ;
return Prophet::Replica->get_handle(
{ url => $self->url . '/resolutions',
app_handle => $self->app_handle,
@@ -106,7 +106,6 @@ sub _check_for_upgrades {
-sub state_handle { return shift; }
sub __fetch_data {
my $self = shift;
-----------------------------------------------------------------------
More information about the Bps-public-commit
mailing list