[Bps-public-commit] SD branch, master, updated. 11425496351bfe55d6967a37d3dd4df8be2d54e5

jesse jesse at bestpractical.com
Mon May 18 09:09:20 EDT 2009


The branch, master has been updated
       via  11425496351bfe55d6967a37d3dd4df8be2d54e5 (commit)
       via  d1961a07c10f425e03d9e0b7d77e51cf399e5eaa (commit)
      from  51a4c8f634892f633fced404cb389f48406cde22 (commit)

Summary of changes:
 lib/App/SD/ForeignReplica/PullEncoder.pm |   76 +++++++++++++++++++++++++
 lib/App/SD/Replica/rt/PullEncoder.pm     |   89 +++++-------------------------
 lib/App/SD/Replica/trac/PullEncoder.pm   |   82 +++-------------------------
 3 files changed, 98 insertions(+), 149 deletions(-)

- Log -----------------------------------------------------------------
commit d1961a07c10f425e03d9e0b7d77e51cf399e5eaa
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Fri May 15 14:38:47 2009 -0400

    Extract the 'run'  method from the trac and RT PullEncoders

diff --git a/lib/App/SD/ForeignReplica/PullEncoder.pm b/lib/App/SD/ForeignReplica/PullEncoder.pm
index 20448f4..33d5e01 100644
--- a/lib/App/SD/ForeignReplica/PullEncoder.pm
+++ b/lib/App/SD/ForeignReplica/PullEncoder.pm
@@ -1,6 +1,82 @@
 package App::SD::ForeignReplica::PullEncoder;
 use Any::Moose;
 use App::SD::Util;
+use Params::Validate qw/validate/;
+
+sub run {
+    my $self = shift;
+    my %args = validate( @_, {   after    => 1, callback => 1, });
+
+    $self->sync_source->log('Finding matching tickets');
+    
+    my @tickets = $self->find_matching_tickets( query => $self->sync_source->query );
+
+    if ( @tickets == 0 ) {
+        $self->sync_source->log("No tickets found.");
+        return;
+    }
+
+    my $counter = 0;
+    $self->sync_source->log("Discovering ticket history");
+
+    my ( $last_txn, @changesets );
+    my $previously_modified = App::SD::Util::string_to_datetime( $self->sync_source->upstream_last_modified_date );
+
+    my $progress = Time::Progress->new();
+    $progress->attr( max => $#tickets );
+
+    local $| = 1;
+    
+    my $last_modified;
+
+    for my $ticket (@tickets) {
+        $counter++;
+
+        my $ticket_id = $ticket->{id};
+
+        print $progress->report( "%30b %p Est: %E\r", $counter );
+        $self->sync_source->log( "Fetching ticket $ticket_id - $counter of " . scalar @tickets );
+
+        my $final_state         = $self->_translate_final_ticket_state($ticket);
+        my $initial_state       = {%$final_state};
+
+        my $transactions = $self->find_matching_transactions(
+            ticket => $ticket,
+            starting_transaction =>
+                $self->sync_source->app_handle->handle->last_changeset_from_source(
+                $self->sync_source->uuid_for_remote_id($ticket_id)
+                ) || 1
+        );
+
+        # Walk transactions newest to oldest.
+        my $txn_counter = 0;
+        for my $txn ( sort { $b->{'serial'} <=> $a->{'serial'} } @$transactions ) {
+            my $created =  App::SD::Util::string_to_datetime( $txn->{timesta} );
+
+            $last_modified = $txn->{timestamp}     if ( !$last_modified || ( $txn->{timestamp} > $last_modified ) );
+
+            $txn_counter++;
+            $self->sync_source->log( "$ticket_id Transcoding transaction $txn_counter of " . scalar @$transactions );
+            my $changeset = $self->transcode_one_txn( $txn, $initial_state, $final_state );
+            next unless $changeset && $changeset->has_changes;
+            # the changesets are older than the ones that came before, so they goes first
+            unshift @changesets, $changeset;
+        }
+    }
+
+    my $cs_counter = 0;
+    for (@changesets) {
+        $self->sync_source->log(
+            "Applying changeset " . ++$cs_counter . " of " . scalar @changesets );
+        $args{callback}->($_);
+    }
+
+    $self->sync_source->record_upstream_last_modified_date($last_modified)
+        if ( ( $last_modified ? $last_modified->epoch : 0 )
+        > ( $previously_modified ? $previously_modified->epoch : 0 ) );
+
+}
+
 
 sub warp_list_to_old_value {
     my $self    = shift;
diff --git a/lib/App/SD/Replica/rt/PullEncoder.pm b/lib/App/SD/Replica/rt/PullEncoder.pm
index 4faff30..696cddb 100644
--- a/lib/App/SD/Replica/rt/PullEncoder.pm
+++ b/lib/App/SD/Replica/rt/PullEncoder.pm
@@ -11,73 +11,7 @@ has sync_source =>
     ( isa => 'App::SD::Replica::rt',
       is => 'rw');
 
-sub run {
-    my $self = shift;
-    my %args = validate(
-        @_,
-        {   after    => 1,
-            callback => 1,
-        }
-    );
-
-
-    my @tickets = $self->find_matching_tickets( $self->sync_source->query );
-    $self->sync_source->log("No tickets found.") if @tickets == 0;
-
-    my $counter = 0;
-    $self->sync_source->log("Discovering ticket history");
-
-    my ( $last_txn, @changesets );
-    my $previously_modified = App::SD::Util::string_to_datetime( $self->sync_source->upstream_last_modified_date );
-
-    my $last_modified;
-    my $progress = Time::Progress->new();
-    $progress->attr( max => $#tickets );
-
-    local $| = 1;
-
-    for my $ticket (@tickets) {
-        $counter++;
-
-        my $ticket_id = $ticket->{id};
-
-        print $progress->report( "%30b %p Est: %E\r", $counter );
-    
-        $self->sync_source->log( "Fetching ticket $ticket_id - $counter of " . scalar @tickets );
 
-        my $final_state= $self->_translate_final_ticket_state( $ticket);
-        my @transactions = $self->find_matching_transactions( 
-                ticket               => $ticket, 
-                starting_transaction => $self->sync_source->app_handle->handle->last_changeset_from_source( $self->sync_source->uuid_for_remote_id( $ticket_id )
-
-            
-                                          )|| 1 ) ;
-
-        my $txn_counter = 0;
-        for my $txn ( sort { $b->{'id'} <=> $a->{'id'} } @transactions ) {
-            my $created = App::SD::Util::string_to_datetime( $txn->{Created} );
-
-            $last_modified = $created     if ( !$last_modified || ( $created > $last_modified ) );
-            $last_txn      = $txn->{'id'} if ( !$last_txn      || ( $txn->{id} > $last_txn ) );
-
-            $txn_counter++;
-            $self->sync_source->log( "Transcoding transaction  @{[$txn->{'id'}]} - $txn_counter of " . scalar @transactions );
-            my $changeset = $self->transcode_one_txn( $txn, $final_state );
-            $changeset->created( $txn->{'Created'} );
-            next unless $changeset->has_changes;
-            unshift @changesets, $changeset;
-        }
-    }
-
-    my $cs_counter = 0;
-    for (@changesets) {
-        $self->sync_source->log( "Applying changeset " . ++$cs_counter . " of " . scalar @changesets );
-        $args{callback}->($_);
-    }
-
-    $self->sync_source->record_upstream_last_modified_date($last_modified) if ( ( $last_modified ? $last_modified->epoch : 0 ) > ( $previously_modified ? $previously_modified->epoch : 0 ) );
-#    $self->sync_source->record_upstream_last_txn($last_txn) if ( ( $last_txn || 0 ) > ( $self->sync_source->upstream_last_txn || 0 ) );
-}
 
 sub _translate_final_ticket_state {
     my $self   = shift;
@@ -128,7 +62,7 @@ sub _translate_final_ticket_state {
     return $ticket;
 }
 
-=head2 find_matching_tickets QUERY
+=head2 find_matching_tickets query => QUERY
 
 Returns an RT::Client ticket collection for all tickets found matching your QUERY string.
 
@@ -136,8 +70,8 @@ Returns an RT::Client ticket collection for all tickets found matching your QUER
 
 sub find_matching_tickets {
     my $self = shift;
-    my ($query) = validate_pos( @_, 1 );
-
+    my %args = validate(@_,{query => 1});
+    my $query = $args{query};
     # If we've ever synced, we can limit our search to only newer things
     if ( my $before = $self->_only_pull_tickets_modified_after ) {
        $query = "($query) AND LastUpdated >= '" . $before->ymd('-') . " " . $before->hms(':') . "'";
@@ -187,14 +121,18 @@ sub find_matching_transactions {
             }
 
         }
-        push @txns, $txn_hash;
+        push @txns, { timestamp => App::SD::Util::string_to_datetime( $txn_hash->{Created} ),
+                      serial => $txn_hash->{id},
+                      object => $txn_hash};
     }
-    return @txns;
+    return \@txns;
 }
 
 sub transcode_one_txn {
-    my ($self, $txn, $ticket) = (@_);
-    
+    my ($self, $txn_wrapper, $ticket_initial_state, $ticket) = (@_);
+   
+    my $txn = $txn_wrapper->{object};
+
     my $sub = $self->can( '_recode_txn_' . $txn->{'Type'} );
     unless ( $sub ) {
         die "Transaction type $txn->{Type} (for transaction $txn->{id}) not implemented yet";
@@ -202,6 +140,7 @@ sub transcode_one_txn {
     my $changeset = Prophet::ChangeSet->new(
         {   original_source_uuid => $self->sync_source->uuid_for_remote_id( $ticket->{ $self->sync_source->uuid . '-id' } ),
             original_sequence_no => $txn->{'id'},
+            created =>  $txn->{'Created'},
             creator => $self->resolve_user_id_to( email_address => $txn->{'Creator'} ),
         }
     );
diff --git a/lib/App/SD/Replica/trac/PullEncoder.pm b/lib/App/SD/Replica/trac/PullEncoder.pm
index 7439a05..8c40ebf 100644
--- a/lib/App/SD/Replica/trac/PullEncoder.pm
+++ b/lib/App/SD/Replica/trac/PullEncoder.pm
@@ -9,76 +9,7 @@ use DateTime;
 
 has sync_source => (
     isa => 'App::SD::Replica::trac',
-    is  => 'rw'
-);
-
-sub run { # trac
-    my $self = shift;
-    my %args = validate( @_, {   after    => 1, callback => 1, });
-
-    $self->sync_source->log('Finding matching tickets');
-        
-    my @tickets = $self->find_matching_tickets( query => $self->sync_source->query );
-
-    if ( @tickets == 0 ) {
-        $self->sync_source->log("No tickets found.");
-        return;
-    }
-
-    my @changesets;
-    my $counter = 0;
-    $self->sync_source->log("Discovering ticket history");
-
-    my $progress = Time::Progress->new();
-    $progress->attr( max => $#tickets );
-
-    local $| = 1;
-
-    my $last_modified_date;
-
-
-    for my $ticket (@tickets) {
-
-        $counter++;
-        print $progress->report( "%30b %p Est: %E\r", $counter );
-        $self->sync_source->log( "Fetching ticket @{[$ticket->id]} - $counter of " . scalar @tickets );
-
-        $last_modified_date = $ticket->last_modified if ( !$last_modified_date || $ticket->last_modified > $last_modified_date );
-
-        my $ticket_data         = $self->_translate_final_ticket_state($ticket);
-        my $ticket_initial_data = {%$ticket_data};
-
-        my $transactions = $self->find_matching_transactions(
-            ticket => $ticket,
-            starting_transaction =>
-                $self->sync_source->app_handle->handle->last_changeset_from_source(
-                $self->sync_source->uuid_for_remote_id( $ticket->id )
-                )
-        );
-
-
-
-        # Walk transactions newest to oldest.
-        for my $txn ( sort { $b->date <=> $a->date } @$transactions ) {
-            $self->sync_source->log( $ticket->id . " - Transcoding transaction  @{[$txn->{date}]} " );
-
-            # the changesets are older than the ones that came before, so they goes first
-            unshift @changesets,
-                grep {defined} $self->transcode_one_txn( $txn, $ticket_initial_data, $ticket_data );
-        }
-    }
-
-    my $cs_counter = 0;
-    for (@changesets) {
-        $self->sync_source->log( "Applying changeset " . ++$cs_counter . " of " . scalar @changesets );
-        $args{callback}->($_);
-    }
-
-    $self->sync_source->record_upstream_last_modified_date($last_modified_date);
-
-
-
-}
+    is  => 'rw');
 
 
 sub _translate_final_ticket_state {
@@ -160,7 +91,9 @@ sub find_matching_transactions {
         next if ($self->sync_source->foreign_transaction_originated_locally($txn_date, $args{'ticket'}->id) );
 
         # ok. it didn't originate locally. we might want to integrate it
-        push @txns, $txn;
+        push @txns, { timestamp => $txn->date,
+                      serial => $txn->date->epoch,
+                      object => $txn};
     }
     $self->sync_source->log('Done looking at pulled txns');
     return \@txns;
@@ -230,8 +163,9 @@ sub transcode_create_txn {
             # 1 changeset if it was a normal txn
             # 2 changesets if we needed to to some magic fixups.
 sub transcode_one_txn {
-    my ( $self, $txn, $ticket, $ticket_final ) = (@_);
+    my ( $self, $txn_wrapper, $ticket, $ticket_final ) = (@_);
 
+    my $txn = $txn_wrapper->{object};
 
     if ($txn->is_create) {
         return $self->transcode_create_txn($txn,$ticket,$ticket_final);

commit 11425496351bfe55d6967a37d3dd4df8be2d54e5
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Mon May 18 09:08:31 2009 -0400

    cleanup toward unification of pullencoder code

diff --git a/lib/App/SD/ForeignReplica/PullEncoder.pm b/lib/App/SD/ForeignReplica/PullEncoder.pm
index 33d5e01..a6ef447 100644
--- a/lib/App/SD/ForeignReplica/PullEncoder.pm
+++ b/lib/App/SD/ForeignReplica/PullEncoder.pm
@@ -9,9 +9,9 @@ sub run {
 
     $self->sync_source->log('Finding matching tickets');
     
-    my @tickets = $self->find_matching_tickets( query => $self->sync_source->query );
+    my $tickets = $self->find_matching_tickets( query => $self->sync_source->query );
 
-    if ( @tickets == 0 ) {
+    if ( scalar @$tickets == 0 ) {
         $self->sync_source->log("No tickets found.");
         return;
     }
@@ -23,19 +23,19 @@ sub run {
     my $previously_modified = App::SD::Util::string_to_datetime( $self->sync_source->upstream_last_modified_date );
 
     my $progress = Time::Progress->new();
-    $progress->attr( max => $#tickets );
+    $progress->attr( max => $#$tickets );
 
     local $| = 1;
     
     my $last_modified;
 
-    for my $ticket (@tickets) {
+    for my $ticket (@$tickets) {
         $counter++;
 
         my $ticket_id = $ticket->{id};
 
         print $progress->report( "%30b %p Est: %E\r", $counter );
-        $self->sync_source->log( "Fetching ticket $ticket_id - $counter of " . scalar @tickets );
+        $self->sync_source->log( "Fetching ticket $ticket_id - $counter of " . scalar @$tickets );
 
         my $final_state         = $self->_translate_final_ticket_state($ticket);
         my $initial_state       = {%$final_state};
diff --git a/lib/App/SD/Replica/rt/PullEncoder.pm b/lib/App/SD/Replica/rt/PullEncoder.pm
index 696cddb..4c546d9 100644
--- a/lib/App/SD/Replica/rt/PullEncoder.pm
+++ b/lib/App/SD/Replica/rt/PullEncoder.pm
@@ -77,11 +77,11 @@ sub find_matching_tickets {
        $query = "($query) AND LastUpdated >= '" . $before->ymd('-') . " " . $before->hms(':') . "'";
         $self->sync_source->log( "Skipping all tickets not updated since " . $before->iso8601 );
     }
-    return map {
+    return [map {
         my $hash = $self->sync_source->rt->show( type => 'ticket', id => $_ );
         $hash->{id} =~ s|^ticket/||g;
         $hash
-    } $self->sync_source->rt->search( type => 'ticket', query => $query );
+    } $self->sync_source->rt->search( type => 'ticket', query => $query )];
 }
 
 
diff --git a/lib/App/SD/Replica/trac/PullEncoder.pm b/lib/App/SD/Replica/trac/PullEncoder.pm
index 8c40ebf..064dd18 100644
--- a/lib/App/SD/Replica/trac/PullEncoder.pm
+++ b/lib/App/SD/Replica/trac/PullEncoder.pm
@@ -66,7 +66,7 @@ sub find_matching_tickets {
         # >= is wasteful but may catch race conditions
         @results = grep {$_->last_modified >= $last_changeset_seen_dt} @results; 
     }
-    return @results;
+    return \@results;
 }
 
 =head2 find_matching_transactions { ticket => $id, starting_transaction => $num  }
@@ -78,7 +78,7 @@ Returns a reference to an array of all transactions (as hashes) on ticket $id af
 sub find_matching_transactions { 
     my $self = shift;
     my %args = validate( @_, { ticket => 1, starting_transaction => 1 } );
-    my @raw_txns = @{$args{ticket}->history->entries};
+    my @raw_txns = $args{ticket}->comments;
 
     my @txns;
     # XXX TODO make this one loop.

-----------------------------------------------------------------------



More information about the Bps-public-commit mailing list