[Bps-public-commit] SD branch, master, updated. 11425496351bfe55d6967a37d3dd4df8be2d54e5
jesse
jesse at bestpractical.com
Mon May 18 09:09:20 EDT 2009
The branch, master has been updated
via 11425496351bfe55d6967a37d3dd4df8be2d54e5 (commit)
via d1961a07c10f425e03d9e0b7d77e51cf399e5eaa (commit)
from 51a4c8f634892f633fced404cb389f48406cde22 (commit)
Summary of changes:
lib/App/SD/ForeignReplica/PullEncoder.pm | 76 +++++++++++++++++++++++++
lib/App/SD/Replica/rt/PullEncoder.pm | 89 +++++-------------------------
lib/App/SD/Replica/trac/PullEncoder.pm | 82 +++-------------------------
3 files changed, 98 insertions(+), 149 deletions(-)
- Log -----------------------------------------------------------------
commit d1961a07c10f425e03d9e0b7d77e51cf399e5eaa
Author: Jesse Vincent <jesse at bestpractical.com>
Date: Fri May 15 14:38:47 2009 -0400
Extract the 'run' method from the trac and RT PullEncoders
diff --git a/lib/App/SD/ForeignReplica/PullEncoder.pm b/lib/App/SD/ForeignReplica/PullEncoder.pm
index 20448f4..33d5e01 100644
--- a/lib/App/SD/ForeignReplica/PullEncoder.pm
+++ b/lib/App/SD/ForeignReplica/PullEncoder.pm
@@ -1,6 +1,82 @@
package App::SD::ForeignReplica::PullEncoder;
use Any::Moose;
use App::SD::Util;
+use Params::Validate qw/validate/;
+
+sub run {
+ my $self = shift;
+ my %args = validate( @_, { after => 1, callback => 1, });
+
+ $self->sync_source->log('Finding matching tickets');
+
+ my @tickets = $self->find_matching_tickets( query => $self->sync_source->query );
+
+ if ( @tickets == 0 ) {
+ $self->sync_source->log("No tickets found.");
+ return;
+ }
+
+ my $counter = 0;
+ $self->sync_source->log("Discovering ticket history");
+
+ my ( $last_txn, @changesets );
+ my $previously_modified = App::SD::Util::string_to_datetime( $self->sync_source->upstream_last_modified_date );
+
+ my $progress = Time::Progress->new();
+ $progress->attr( max => $#tickets );
+
+ local $| = 1;
+
+ my $last_modified;
+
+ for my $ticket (@tickets) {
+ $counter++;
+
+ my $ticket_id = $ticket->{id};
+
+ print $progress->report( "%30b %p Est: %E\r", $counter );
+ $self->sync_source->log( "Fetching ticket $ticket_id - $counter of " . scalar @tickets );
+
+ my $final_state = $self->_translate_final_ticket_state($ticket);
+ my $initial_state = {%$final_state};
+
+ my $transactions = $self->find_matching_transactions(
+ ticket => $ticket,
+ starting_transaction =>
+ $self->sync_source->app_handle->handle->last_changeset_from_source(
+ $self->sync_source->uuid_for_remote_id($ticket_id)
+ ) || 1
+ );
+
+ # Walk transactions newest to oldest.
+ my $txn_counter = 0;
+ for my $txn ( sort { $b->{'serial'} <=> $a->{'serial'} } @$transactions ) {
+ my $created = App::SD::Util::string_to_datetime( $txn->{timesta} );
+
+ $last_modified = $txn->{timestamp} if ( !$last_modified || ( $txn->{timestamp} > $last_modified ) );
+
+ $txn_counter++;
+ $self->sync_source->log( "$ticket_id Transcoding transaction $txn_counter of " . scalar @$transactions );
+ my $changeset = $self->transcode_one_txn( $txn, $initial_state, $final_state );
+ next unless $changeset && $changeset->has_changes;
+ # the changesets are older than the ones that came before, so they goes first
+ unshift @changesets, $changeset;
+ }
+ }
+
+ my $cs_counter = 0;
+ for (@changesets) {
+ $self->sync_source->log(
+ "Applying changeset " . ++$cs_counter . " of " . scalar @changesets );
+ $args{callback}->($_);
+ }
+
+ $self->sync_source->record_upstream_last_modified_date($last_modified)
+ if ( ( $last_modified ? $last_modified->epoch : 0 )
+ > ( $previously_modified ? $previously_modified->epoch : 0 ) );
+
+}
+
sub warp_list_to_old_value {
my $self = shift;
diff --git a/lib/App/SD/Replica/rt/PullEncoder.pm b/lib/App/SD/Replica/rt/PullEncoder.pm
index 4faff30..696cddb 100644
--- a/lib/App/SD/Replica/rt/PullEncoder.pm
+++ b/lib/App/SD/Replica/rt/PullEncoder.pm
@@ -11,73 +11,7 @@ has sync_source =>
( isa => 'App::SD::Replica::rt',
is => 'rw');
-sub run {
- my $self = shift;
- my %args = validate(
- @_,
- { after => 1,
- callback => 1,
- }
- );
-
-
- my @tickets = $self->find_matching_tickets( $self->sync_source->query );
- $self->sync_source->log("No tickets found.") if @tickets == 0;
-
- my $counter = 0;
- $self->sync_source->log("Discovering ticket history");
-
- my ( $last_txn, @changesets );
- my $previously_modified = App::SD::Util::string_to_datetime( $self->sync_source->upstream_last_modified_date );
-
- my $last_modified;
- my $progress = Time::Progress->new();
- $progress->attr( max => $#tickets );
-
- local $| = 1;
-
- for my $ticket (@tickets) {
- $counter++;
-
- my $ticket_id = $ticket->{id};
-
- print $progress->report( "%30b %p Est: %E\r", $counter );
-
- $self->sync_source->log( "Fetching ticket $ticket_id - $counter of " . scalar @tickets );
- my $final_state= $self->_translate_final_ticket_state( $ticket);
- my @transactions = $self->find_matching_transactions(
- ticket => $ticket,
- starting_transaction => $self->sync_source->app_handle->handle->last_changeset_from_source( $self->sync_source->uuid_for_remote_id( $ticket_id )
-
-
- )|| 1 ) ;
-
- my $txn_counter = 0;
- for my $txn ( sort { $b->{'id'} <=> $a->{'id'} } @transactions ) {
- my $created = App::SD::Util::string_to_datetime( $txn->{Created} );
-
- $last_modified = $created if ( !$last_modified || ( $created > $last_modified ) );
- $last_txn = $txn->{'id'} if ( !$last_txn || ( $txn->{id} > $last_txn ) );
-
- $txn_counter++;
- $self->sync_source->log( "Transcoding transaction @{[$txn->{'id'}]} - $txn_counter of " . scalar @transactions );
- my $changeset = $self->transcode_one_txn( $txn, $final_state );
- $changeset->created( $txn->{'Created'} );
- next unless $changeset->has_changes;
- unshift @changesets, $changeset;
- }
- }
-
- my $cs_counter = 0;
- for (@changesets) {
- $self->sync_source->log( "Applying changeset " . ++$cs_counter . " of " . scalar @changesets );
- $args{callback}->($_);
- }
-
- $self->sync_source->record_upstream_last_modified_date($last_modified) if ( ( $last_modified ? $last_modified->epoch : 0 ) > ( $previously_modified ? $previously_modified->epoch : 0 ) );
-# $self->sync_source->record_upstream_last_txn($last_txn) if ( ( $last_txn || 0 ) > ( $self->sync_source->upstream_last_txn || 0 ) );
-}
sub _translate_final_ticket_state {
my $self = shift;
@@ -128,7 +62,7 @@ sub _translate_final_ticket_state {
return $ticket;
}
-=head2 find_matching_tickets QUERY
+=head2 find_matching_tickets query => QUERY
Returns an RT::Client ticket collection for all tickets found matching your QUERY string.
@@ -136,8 +70,8 @@ Returns an RT::Client ticket collection for all tickets found matching your QUER
sub find_matching_tickets {
my $self = shift;
- my ($query) = validate_pos( @_, 1 );
-
+ my %args = validate(@_,{query => 1});
+ my $query = $args{query};
# If we've ever synced, we can limit our search to only newer things
if ( my $before = $self->_only_pull_tickets_modified_after ) {
$query = "($query) AND LastUpdated >= '" . $before->ymd('-') . " " . $before->hms(':') . "'";
@@ -187,14 +121,18 @@ sub find_matching_transactions {
}
}
- push @txns, $txn_hash;
+ push @txns, { timestamp => App::SD::Util::string_to_datetime( $txn_hash->{Created} ),
+ serial => $txn_hash->{id},
+ object => $txn_hash};
}
- return @txns;
+ return \@txns;
}
sub transcode_one_txn {
- my ($self, $txn, $ticket) = (@_);
-
+ my ($self, $txn_wrapper, $ticket_initial_state, $ticket) = (@_);
+
+ my $txn = $txn_wrapper->{object};
+
my $sub = $self->can( '_recode_txn_' . $txn->{'Type'} );
unless ( $sub ) {
die "Transaction type $txn->{Type} (for transaction $txn->{id}) not implemented yet";
@@ -202,6 +140,7 @@ sub transcode_one_txn {
my $changeset = Prophet::ChangeSet->new(
{ original_source_uuid => $self->sync_source->uuid_for_remote_id( $ticket->{ $self->sync_source->uuid . '-id' } ),
original_sequence_no => $txn->{'id'},
+ created => $txn->{'Created'},
creator => $self->resolve_user_id_to( email_address => $txn->{'Creator'} ),
}
);
diff --git a/lib/App/SD/Replica/trac/PullEncoder.pm b/lib/App/SD/Replica/trac/PullEncoder.pm
index 7439a05..8c40ebf 100644
--- a/lib/App/SD/Replica/trac/PullEncoder.pm
+++ b/lib/App/SD/Replica/trac/PullEncoder.pm
@@ -9,76 +9,7 @@ use DateTime;
has sync_source => (
isa => 'App::SD::Replica::trac',
- is => 'rw'
-);
-
-sub run { # trac
- my $self = shift;
- my %args = validate( @_, { after => 1, callback => 1, });
-
- $self->sync_source->log('Finding matching tickets');
-
- my @tickets = $self->find_matching_tickets( query => $self->sync_source->query );
-
- if ( @tickets == 0 ) {
- $self->sync_source->log("No tickets found.");
- return;
- }
-
- my @changesets;
- my $counter = 0;
- $self->sync_source->log("Discovering ticket history");
-
- my $progress = Time::Progress->new();
- $progress->attr( max => $#tickets );
-
- local $| = 1;
-
- my $last_modified_date;
-
-
- for my $ticket (@tickets) {
-
- $counter++;
- print $progress->report( "%30b %p Est: %E\r", $counter );
- $self->sync_source->log( "Fetching ticket @{[$ticket->id]} - $counter of " . scalar @tickets );
-
- $last_modified_date = $ticket->last_modified if ( !$last_modified_date || $ticket->last_modified > $last_modified_date );
-
- my $ticket_data = $self->_translate_final_ticket_state($ticket);
- my $ticket_initial_data = {%$ticket_data};
-
- my $transactions = $self->find_matching_transactions(
- ticket => $ticket,
- starting_transaction =>
- $self->sync_source->app_handle->handle->last_changeset_from_source(
- $self->sync_source->uuid_for_remote_id( $ticket->id )
- )
- );
-
-
-
- # Walk transactions newest to oldest.
- for my $txn ( sort { $b->date <=> $a->date } @$transactions ) {
- $self->sync_source->log( $ticket->id . " - Transcoding transaction @{[$txn->{date}]} " );
-
- # the changesets are older than the ones that came before, so they goes first
- unshift @changesets,
- grep {defined} $self->transcode_one_txn( $txn, $ticket_initial_data, $ticket_data );
- }
- }
-
- my $cs_counter = 0;
- for (@changesets) {
- $self->sync_source->log( "Applying changeset " . ++$cs_counter . " of " . scalar @changesets );
- $args{callback}->($_);
- }
-
- $self->sync_source->record_upstream_last_modified_date($last_modified_date);
-
-
-
-}
+ is => 'rw');
sub _translate_final_ticket_state {
@@ -160,7 +91,9 @@ sub find_matching_transactions {
next if ($self->sync_source->foreign_transaction_originated_locally($txn_date, $args{'ticket'}->id) );
# ok. it didn't originate locally. we might want to integrate it
- push @txns, $txn;
+ push @txns, { timestamp => $txn->date,
+ serial => $txn->date->epoch,
+ object => $txn};
}
$self->sync_source->log('Done looking at pulled txns');
return \@txns;
@@ -230,8 +163,9 @@ sub transcode_create_txn {
# 1 changeset if it was a normal txn
# 2 changesets if we needed to to some magic fixups.
sub transcode_one_txn {
- my ( $self, $txn, $ticket, $ticket_final ) = (@_);
+ my ( $self, $txn_wrapper, $ticket, $ticket_final ) = (@_);
+ my $txn = $txn_wrapper->{object};
if ($txn->is_create) {
return $self->transcode_create_txn($txn,$ticket,$ticket_final);
commit 11425496351bfe55d6967a37d3dd4df8be2d54e5
Author: Jesse Vincent <jesse at bestpractical.com>
Date: Mon May 18 09:08:31 2009 -0400
cleanup toward unification of pullencoder code
diff --git a/lib/App/SD/ForeignReplica/PullEncoder.pm b/lib/App/SD/ForeignReplica/PullEncoder.pm
index 33d5e01..a6ef447 100644
--- a/lib/App/SD/ForeignReplica/PullEncoder.pm
+++ b/lib/App/SD/ForeignReplica/PullEncoder.pm
@@ -9,9 +9,9 @@ sub run {
$self->sync_source->log('Finding matching tickets');
- my @tickets = $self->find_matching_tickets( query => $self->sync_source->query );
+ my $tickets = $self->find_matching_tickets( query => $self->sync_source->query );
- if ( @tickets == 0 ) {
+ if ( scalar @$tickets == 0 ) {
$self->sync_source->log("No tickets found.");
return;
}
@@ -23,19 +23,19 @@ sub run {
my $previously_modified = App::SD::Util::string_to_datetime( $self->sync_source->upstream_last_modified_date );
my $progress = Time::Progress->new();
- $progress->attr( max => $#tickets );
+ $progress->attr( max => $#$tickets );
local $| = 1;
my $last_modified;
- for my $ticket (@tickets) {
+ for my $ticket (@$tickets) {
$counter++;
my $ticket_id = $ticket->{id};
print $progress->report( "%30b %p Est: %E\r", $counter );
- $self->sync_source->log( "Fetching ticket $ticket_id - $counter of " . scalar @tickets );
+ $self->sync_source->log( "Fetching ticket $ticket_id - $counter of " . scalar @$tickets );
my $final_state = $self->_translate_final_ticket_state($ticket);
my $initial_state = {%$final_state};
diff --git a/lib/App/SD/Replica/rt/PullEncoder.pm b/lib/App/SD/Replica/rt/PullEncoder.pm
index 696cddb..4c546d9 100644
--- a/lib/App/SD/Replica/rt/PullEncoder.pm
+++ b/lib/App/SD/Replica/rt/PullEncoder.pm
@@ -77,11 +77,11 @@ sub find_matching_tickets {
$query = "($query) AND LastUpdated >= '" . $before->ymd('-') . " " . $before->hms(':') . "'";
$self->sync_source->log( "Skipping all tickets not updated since " . $before->iso8601 );
}
- return map {
+ return [map {
my $hash = $self->sync_source->rt->show( type => 'ticket', id => $_ );
$hash->{id} =~ s|^ticket/||g;
$hash
- } $self->sync_source->rt->search( type => 'ticket', query => $query );
+ } $self->sync_source->rt->search( type => 'ticket', query => $query )];
}
diff --git a/lib/App/SD/Replica/trac/PullEncoder.pm b/lib/App/SD/Replica/trac/PullEncoder.pm
index 8c40ebf..064dd18 100644
--- a/lib/App/SD/Replica/trac/PullEncoder.pm
+++ b/lib/App/SD/Replica/trac/PullEncoder.pm
@@ -66,7 +66,7 @@ sub find_matching_tickets {
# >= is wasteful but may catch race conditions
@results = grep {$_->last_modified >= $last_changeset_seen_dt} @results;
}
- return @results;
+ return \@results;
}
=head2 find_matching_transactions { ticket => $id, starting_transaction => $num }
@@ -78,7 +78,7 @@ Returns a reference to an array of all transactions (as hashes) on ticket $id af
sub find_matching_transactions {
my $self = shift;
my %args = validate( @_, { ticket => 1, starting_transaction => 1 } );
- my @raw_txns = @{$args{ticket}->history->entries};
+ my @raw_txns = $args{ticket}->comments;
my @txns;
# XXX TODO make this one loop.
-----------------------------------------------------------------------
More information about the Bps-public-commit
mailing list