[Bps-public-commit] SD branch, master, updated. 6a5853eec7b9eee0ff219cc6e9df0786d37434bd

jesse jesse at bestpractical.com
Thu May 14 20:49:55 EDT 2009


The branch, master has been updated
       via  6a5853eec7b9eee0ff219cc6e9df0786d37434bd (commit)
       via  1a3e560bbc64b2a0ab8bd808c2f66088146ef60b (commit)
       via  1a02fd13c7570d416a9e76678f0465ef42dbef35 (commit)
       via  d6c20712bc1b318dc952cb96f7185e8f0aeb6fbd (commit)
       via  fc679e773adad13a80006e721dbd1b217e43dec3 (commit)
       via  9448f6b069fffc84772f2db5958f084e03abb38e (commit)
       via  ed6847cfe68a7d1b7fc3692aa0e6350efc0f97ba (commit)
       via  9003c2a45c355db7a75d7a69f7eb84a5fdf8f4fe (commit)
      from  e611e17b537573972897eebc90f6d491bd4cb06e (commit)

Summary of changes:
 lib/App/SD/ForeignReplica.pm             |    2 +-
 lib/App/SD/ForeignReplica/PushEncoder.pm |   60 ++++++++++++++++
 lib/App/SD/Replica/rt.pm                 |   20 ++----
 lib/App/SD/Replica/rt/PullEncoder.pm     |  108 ++++++++++++++----------------
 lib/App/SD/Replica/rt/PushEncoder.pm     |   63 ++----------------
 lib/App/SD/Replica/trac.pm               |    9 +--
 lib/App/SD/Replica/trac/PullEncoder.pm   |   69 +++++++++++--------
 lib/App/SD/Replica/trac/PushEncoder.pm   |   59 +---------------
 t/sd-rt/basic.t                          |   11 ++-
 9 files changed, 175 insertions(+), 226 deletions(-)

- Log -----------------------------------------------------------------
commit 9003c2a45c355db7a75d7a69f7eb84a5fdf8f4fe
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Thu May 14 15:36:54 2009 -0400

    Some renaming toward extracting shared code from the PullEncoders

diff --git a/lib/App/SD/Replica/trac.pm b/lib/App/SD/Replica/trac.pm
index 000012d..b190c6e 100644
--- a/lib/App/SD/Replica/trac.pm
+++ b/lib/App/SD/Replica/trac.pm
@@ -16,7 +16,7 @@ use Prophet::ChangeSet;
 
 has trac => ( isa => 'Net::Trac::Connection', is => 'rw');
 has remote_url => ( isa => 'Str', is => 'rw');
-
+has query => ( isa => 'Maybe[Str]', is => 'rw');
 sub foreign_username { return shift->trac->user(@_) }
 
 sub BUILD {
@@ -55,8 +55,7 @@ sub get_txn_list_by_date {
     my $ticket_obj = Net::Trac::Ticket->new( connection => $self->trac);
     $ticket_obj->load($ticket);
         
-    my @txns   = map { { id => $_->date->epoch, creator => $_->author, created => $_->date->epoch } }
-        sort {$b->date <=> $a->date }  @{$ticket_obj->history->entries};
+    my @txns   = map { { id => $_->date->epoch, creator => $_->author, created => $_->date->epoch } } sort {$b->date <=> $a->date }  @{$ticket_obj->history->entries};
     return @txns;
 }
         
diff --git a/lib/App/SD/Replica/trac/PullEncoder.pm b/lib/App/SD/Replica/trac/PullEncoder.pm
index e563a3c..2cc6db0 100644
--- a/lib/App/SD/Replica/trac/PullEncoder.pm
+++ b/lib/App/SD/Replica/trac/PullEncoder.pm
@@ -12,16 +12,13 @@ has sync_source => (
     is  => 'rw'
 );
 
-sub run {
+sub run { # trac
     my $self = shift;
-    my %args = validate(
-        @_,
-        {   after    => 1,
-            callback => 1,
-        }
-    );
+    my %args = validate( @_, {   after    => 1, callback => 1, });
+
     $self->sync_source->log('Finding matching tickets');
-    my @tickets = @{ $self->find_matching_tickets() };
+        
+    my @tickets = $self->find_matching_tickets( query => $self->sync_source->query );
 
     if ( @tickets == 0 ) {
         $self->sync_source->log("No tickets found.");
@@ -31,44 +28,50 @@ sub run {
     my @changesets;
     my $counter = 0;
     $self->sync_source->log("Discovering ticket history");
+
     my $progress = Time::Progress->new();
     $progress->attr( max => $#tickets );
+
     local $| = 1;
 
     my $last_modified_date;
 
+
     for my $ticket (@tickets) {
+
+        $counter++;
         print $progress->report( "%30b %p Est: %E\r", $counter );
-        $self->sync_source->log(
-            "Fetching ticket @{[$ticket->id]} - " . ++$counter . " of " . scalar @tickets );
+        $self->sync_source->log( "Fetching ticket @{[$ticket->id]} - $counter of " . scalar @tickets );
 
-        $last_modified_date = $ticket->last_modified
-            if ( !$last_modified_date || $ticket->last_modified > $last_modified_date );
+        $last_modified_date = $ticket->last_modified if ( !$last_modified_date || $ticket->last_modified > $last_modified_date );
 
         my $ticket_data         = $self->_translate_final_ticket_state($ticket);
         my $ticket_initial_data = {%$ticket_data};
-        my $txns                = $self->skip_previously_seen_transactions(
-            ticket       => $ticket,
-            transactions => $ticket->history->entries,
-            starting_transaction => $self->sync_source->app_handle->handle->last_changeset_from_source(
- $self->sync_source->uuid_for_remote_id( $ticket->id )
-        )
 
-        );
-    
+        my $transactions = $self->find_matching_transactions( ticket => $ticket, starting_transaction => $self->sync_source->app_handle->handle->last_changeset_from_source( $self->sync_source->uuid_for_remote_id( $ticket->id )) );
+
+
+
         # Walk transactions newest to oldest.
-        for my $txn ( sort { $b->date <=> $a->date } @$txns ) {
-            $self->sync_source->log( $ticket->id . " - Transcoding transaction  @{[$txn->date]} " );
+        for my $txn ( sort { $b->date <=> $a->date } @$transactions ) {
+            $self->sync_source->log( $ticket->id . " - Transcoding transaction  @{[$txn->{date}]} " );
 
             # the changesets are older than the ones that came before, so they goes first
             unshift @changesets,
                 grep {defined} $self->transcode_one_txn( $txn, $ticket_initial_data, $ticket_data );
         }
+    }
 
+    my $cs_counter = 0;
+    for (@changesets) {
+        $self->sync_source->log( "Applying changeset " . ++$cs_counter . " of " . scalar @changesets );
+        $args{callback}->($_);
     }
 
-    $args{callback}->($_) for @changesets;
     $self->sync_source->record_upstream_last_modified_date($last_modified_date);
+
+
+
 }
 
 
@@ -108,7 +111,7 @@ sub _translate_final_ticket_state {
 
 =head2 find_matching_tickets QUERY
 
-Returns a Trac::TicketSearch collection for all tickets found matching your QUERY hash.
+Returns a array of all tickets found matching your QUERY hash.
 
 =cut
 
@@ -126,21 +129,23 @@ sub find_matching_tickets {
         # >= is wasteful but may catch race conditions
         @results = grep {$_->last_modified >= $last_changeset_seen_dt} @results; 
     }
-    return \@results;
+    return @results;
 }
 
-=head2 skip_previously_seen_transactions { ticket => $id, starting_transaction => $num, transactions => \@txns  }
+=head2 find_matching_transactions { ticket => $id, starting_transaction => $num  }
 
 Returns a reference to an array of all transactions (as hashes) on ticket $id after transaction $num.
 
 =cut
 
-sub skip_previously_seen_transactions {
+sub find_matching_transactions { 
     my $self = shift;
-    my %args = validate( @_, { ticket => 1, transactions => 1, starting_transaction => 0 } );
-    my @txns;
+    my %args = validate( @_, { ticket => 1, starting_transaction => 1 } );
+    my @raw_txns = @{$args{ticket}->history->entries};
 
-    for my $txn ( sort @{ $args{transactions} } ) {
+    my @txns;
+    # XXX TODO make this one loop.
+    for my $txn ( sort @raw_txns) {
         my $txn_date = $txn->date->epoch;
 
         # Skip things we know we've already pulled

commit ed6847cfe68a7d1b7fc3692aa0e6350efc0f97ba
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Thu May 14 15:39:54 2009 -0400

    switch rt_query to query for the RT Foreign Replica type

diff --git a/lib/App/SD/Replica/rt.pm b/lib/App/SD/Replica/rt.pm
index feda76e..7d0afd2 100644
--- a/lib/App/SD/Replica/rt.pm
+++ b/lib/App/SD/Replica/rt.pm
@@ -17,7 +17,7 @@ use Prophet::ChangeSet;
 has rt => ( isa => 'RT::Client::REST', is => 'rw');
 has remote_url => ( isa => 'Str', is => 'rw');
 has rt_queue => ( isa => 'Str', is => 'rw');
-has rt_query => ( isa => 'Str', is => 'rw');
+has query => ( isa => 'Str', is => 'rw');
 has rt_username => (isa => 'Str', is => 'rw');
 
 sub BUILD {
@@ -38,7 +38,7 @@ sub BUILD {
     }
     $self->remote_url($uri->as_string);
     $self->rt_queue($type);
-    $self->rt_query( ( $query ?  "($query) AND " :"") . " Queue = '$type'" );
+    $self->query( ( $query ?  "($query) AND " :"") . " Queue = '$type'" );
     $self->rt( RT::Client::REST->new( server => $server ) );
 
     ( $username, $password ) = $self->prompt_for_login( $uri, $username ) unless $password;
@@ -92,7 +92,7 @@ Return the replica's UUID
 
 sub uuid {
     my $self = shift;
-    return $self->uuid_for_url( join( '/', $self->remote_url, $self->rt_query ) );
+    return $self->uuid_for_url( join( '/', $self->remote_url, $self->query ) );
 
 }
 
diff --git a/lib/App/SD/Replica/rt/PullEncoder.pm b/lib/App/SD/Replica/rt/PullEncoder.pm
index 0be57e2..3e303f4 100644
--- a/lib/App/SD/Replica/rt/PullEncoder.pm
+++ b/lib/App/SD/Replica/rt/PullEncoder.pm
@@ -23,7 +23,7 @@ sub run {
     my $tickets = {};
     my @transactions;
 
-    my @tickets = $self->find_matching_tickets( $self->sync_source->rt_query );
+    my @tickets = $self->find_matching_tickets( $self->sync_source->query );
 
     $self->sync_source->log("No tickets found.") if @tickets == 0;
 

commit 9448f6b069fffc84772f2db5958f084e03abb38e
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Thu May 14 15:55:27 2009 -0400

    tidy

diff --git a/lib/App/SD/Replica/trac/PullEncoder.pm b/lib/App/SD/Replica/trac/PullEncoder.pm
index 2cc6db0..7439a05 100644
--- a/lib/App/SD/Replica/trac/PullEncoder.pm
+++ b/lib/App/SD/Replica/trac/PullEncoder.pm
@@ -48,7 +48,13 @@ sub run { # trac
         my $ticket_data         = $self->_translate_final_ticket_state($ticket);
         my $ticket_initial_data = {%$ticket_data};
 
-        my $transactions = $self->find_matching_transactions( ticket => $ticket, starting_transaction => $self->sync_source->app_handle->handle->last_changeset_from_source( $self->sync_source->uuid_for_remote_id( $ticket->id )) );
+        my $transactions = $self->find_matching_transactions(
+            ticket => $ticket,
+            starting_transaction =>
+                $self->sync_source->app_handle->handle->last_changeset_from_source(
+                $self->sync_source->uuid_for_remote_id( $ticket->id )
+                )
+        );
 
 
 

commit fc679e773adad13a80006e721dbd1b217e43dec3
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Thu May 14 15:55:29 2009 -0400

    Minor naming alignment

diff --git a/lib/App/SD/Replica/rt/PullEncoder.pm b/lib/App/SD/Replica/rt/PullEncoder.pm
index 3e303f4..2c11f48 100644
--- a/lib/App/SD/Replica/rt/PullEncoder.pm
+++ b/lib/App/SD/Replica/rt/PullEncoder.pm
@@ -30,7 +30,8 @@ sub run {
     my $counter = 0;
     $self->sync_source->log("Discovering ticket history");
 
-    my ( $last_modified, $last_txn );
+    my ( $last_txn );
+    my $previously_modified = App::SD::Util::string_to_datetime( $self->sync_source->upstream_last_modified_date );
 
     my $progress = Time::Progress->new();
     $progress->attr( max => $#tickets );
@@ -56,6 +57,7 @@ sub run {
 
     my $txn_counter = 0;
     my @changesets;
+    my $last_modified;
     for my $txn ( sort { $b->{'id'} <=> $a->{'id'} } @transactions ) {
 
         my $created = App::SD::Util::string_to_datetime( $txn->{Created} );
@@ -64,8 +66,7 @@ sub run {
         $last_txn      = $txn->{'id'} if ( !$last_txn      || ( $txn->{id} > $last_txn ) );
 
         $txn_counter++;
-        $self->sync_source->log( "Transcoding transaction  @{[$txn->{'id'}]} - $txn_counter of "
-                . scalar @transactions );
+        $self->sync_source->log( "Transcoding transaction  @{[$txn->{'id'}]} - $txn_counter of " . scalar @transactions );
         my $changeset = $self->transcode_one_txn( $txn, $tickets->{ $txn->{Ticket} } );
         $changeset->created( $txn->{'Created'} );
         next unless $changeset->has_changes;
@@ -79,13 +80,8 @@ sub run {
         $args{callback}->($_);
     }
 
-    my $last_modified_datetime
-        = App::SD::Util::string_to_datetime( $self->sync_source->upstream_last_modified_date );
-    $self->sync_source->record_upstream_last_modified_date($last_modified)
-        if ( ( $last_modified ? $last_modified->epoch : 0 )
-        > ( $last_modified_datetime ? $last_modified_datetime->epoch : 0 ) );
-    $self->sync_source->record_upstream_last_txn($last_txn)
-        if ( ( $last_txn || 0 ) > ( $self->sync_source->upstream_last_txn || 0 ) );
+    $self->sync_source->record_upstream_last_modified_date($last_modified) if ( ( $last_modified ? $last_modified->epoch : 0 ) > ( $previously_modified ? $previously_modified->epoch : 0 ) );
+    $self->sync_source->record_upstream_last_txn($last_txn) if ( ( $last_txn || 0 ) > ( $self->sync_source->upstream_last_txn || 0 ) );
 }
 
 sub _translate_final_ticket_state {

commit d6c20712bc1b318dc952cb96f7185e8f0aeb6fbd
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Thu May 14 16:10:48 2009 -0400

    switch from "loop over tickets, then loop over txns" to loop over tickets, looping internally on txns

diff --git a/lib/App/SD/Replica/rt/PullEncoder.pm b/lib/App/SD/Replica/rt/PullEncoder.pm
index 2c11f48..461d2c8 100644
--- a/lib/App/SD/Replica/rt/PullEncoder.pm
+++ b/lib/App/SD/Replica/rt/PullEncoder.pm
@@ -21,7 +21,6 @@ sub run {
     );
 
     my $tickets = {};
-    my @transactions;
 
     my @tickets = $self->find_matching_tickets( $self->sync_source->query );
 
@@ -30,9 +29,10 @@ sub run {
     my $counter = 0;
     $self->sync_source->log("Discovering ticket history");
 
-    my ( $last_txn );
+    my ( $last_txn, @changesets );
     my $previously_modified = App::SD::Util::string_to_datetime( $self->sync_source->upstream_last_modified_date );
 
+    my $last_modified;
     my $progress = Time::Progress->new();
     $progress->attr( max => $#tickets );
 
@@ -44,20 +44,10 @@ sub run {
 
         $self->sync_source->log( "Fetching ticket $id - $counter of " . scalar @tickets );
 
-        $tickets->{$id} = $self->_translate_final_ticket_state(
-            $self->sync_source->rt->show( type => 'ticket', id => $id ) );
-        push @transactions, @{
-            $self->find_matching_transactions(
-                ticket               => $id,
-                starting_transaction => ( ( $args{'after'} + 1 ) || 1 )
-
-            )
-            };
-    }
+        $tickets->{$id} = $self->_translate_final_ticket_state( $self->sync_source->rt->show( type => 'ticket', id => $id ) );
+        my @transactions =  @{ $self->find_matching_transactions( ticket               => $id, starting_transaction => ( ( $args{'after'} + 1 ) || 1 ) ) };
 
     my $txn_counter = 0;
-    my @changesets;
-    my $last_modified;
     for my $txn ( sort { $b->{'id'} <=> $a->{'id'} } @transactions ) {
 
         my $created = App::SD::Util::string_to_datetime( $txn->{Created} );
@@ -72,6 +62,7 @@ sub run {
         next unless $changeset->has_changes;
         unshift @changesets, $changeset;
     }
+    }
 
     my $cs_counter = 0;
     for (@changesets) {

commit 1a02fd13c7570d416a9e76678f0465ef42dbef35
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Thu May 14 16:20:54 2009 -0400

    Get rid of the big hash of old tickets

diff --git a/lib/App/SD/Replica/rt/PullEncoder.pm b/lib/App/SD/Replica/rt/PullEncoder.pm
index 461d2c8..256fea9 100644
--- a/lib/App/SD/Replica/rt/PullEncoder.pm
+++ b/lib/App/SD/Replica/rt/PullEncoder.pm
@@ -20,7 +20,6 @@ sub run {
         }
     );
 
-    my $tickets = {};
 
     my @tickets = $self->find_matching_tickets( $self->sync_source->query );
 
@@ -44,30 +43,29 @@ sub run {
 
         $self->sync_source->log( "Fetching ticket $id - $counter of " . scalar @tickets );
 
-        $tickets->{$id} = $self->_translate_final_ticket_state( $self->sync_source->rt->show( type => 'ticket', id => $id ) );
-        my @transactions =  @{ $self->find_matching_transactions( ticket               => $id, starting_transaction => ( ( $args{'after'} + 1 ) || 1 ) ) };
+        my $ticket= $self->_translate_final_ticket_state( $self->sync_source->rt->show( type => 'ticket', id => $id ) );
+        my @transactions = @{ $self->find_matching_transactions( ticket               => $id, starting_transaction => ( ( $args{'after'} + 1 ) || 1 )) };
 
-    my $txn_counter = 0;
-    for my $txn ( sort { $b->{'id'} <=> $a->{'id'} } @transactions ) {
+        my $txn_counter = 0;
+        for my $txn ( sort { $b->{'id'} <=> $a->{'id'} } @transactions ) {
 
-        my $created = App::SD::Util::string_to_datetime( $txn->{Created} );
+            my $created = App::SD::Util::string_to_datetime( $txn->{Created} );
 
-        $last_modified = $created     if ( !$last_modified || ( $created > $last_modified ) );
-        $last_txn      = $txn->{'id'} if ( !$last_txn      || ( $txn->{id} > $last_txn ) );
+            $last_modified = $created     if ( !$last_modified || ( $created > $last_modified ) );
+            $last_txn      = $txn->{'id'} if ( !$last_txn      || ( $txn->{id} > $last_txn ) );
 
-        $txn_counter++;
-        $self->sync_source->log( "Transcoding transaction  @{[$txn->{'id'}]} - $txn_counter of " . scalar @transactions );
-        my $changeset = $self->transcode_one_txn( $txn, $tickets->{ $txn->{Ticket} } );
-        $changeset->created( $txn->{'Created'} );
-        next unless $changeset->has_changes;
-        unshift @changesets, $changeset;
-    }
+            $txn_counter++;
+            $self->sync_source->log( "Transcoding transaction  @{[$txn->{'id'}]} - $txn_counter of " . scalar @transactions );
+            my $changeset = $self->transcode_one_txn( $txn, $ticket );
+            $changeset->created( $txn->{'Created'} );
+            next unless $changeset->has_changes;
+            unshift @changesets, $changeset;
+        }
     }
 
     my $cs_counter = 0;
     for (@changesets) {
-        $self->sync_source->log(
-            "Applying changeset " . ++$cs_counter . " of " . scalar @changesets );
+        $self->sync_source->log( "Applying changeset " . ++$cs_counter . " of " . scalar @changesets );
         $args{callback}->($_);
     }
 

commit 1a3e560bbc64b2a0ab8bd808c2f66088146ef60b
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Thu May 14 18:00:51 2009 -0400

    Starting to refactor foreign replica support - RT will now work like trac and record foreign replicas for each ticket

diff --git a/lib/App/SD/ForeignReplica.pm b/lib/App/SD/ForeignReplica.pm
index ef44086..4348473 100644
--- a/lib/App/SD/ForeignReplica.pm
+++ b/lib/App/SD/ForeignReplica.pm
@@ -68,7 +68,7 @@ sub record_pushed_transactions {
 
         # if the transaction id is older than the id of the last changeset
         # we got from the original source of this changeset, we're done
-        last if $txn->{id} <= $self->upstream_last_txn($args{changeset});
+        last if $txn->{id} <= $self->upstream_last_txn($args{changeset}->original_source_uuid);
         
         # if the transaction from RT is more recent than the most recent
         # transaction we got from the original source of the changeset
diff --git a/lib/App/SD/Replica/trac.pm b/lib/App/SD/Replica/trac.pm
index b190c6e..45c384a 100644
--- a/lib/App/SD/Replica/trac.pm
+++ b/lib/App/SD/Replica/trac.pm
@@ -62,8 +62,8 @@ sub get_txn_list_by_date {
 
 sub upstream_last_txn { 
     my $self = shift;
-    my $changeset = shift;
-    return $self->app_handle->handle->last_changeset_from_source( $changeset->original_source_uuid);
+    my $uuid = shift;
+    return $self->app_handle->handle->last_changeset_from_source( $uuid);
 }
 
 =head2 uuid

commit 6a5853eec7b9eee0ff219cc6e9df0786d37434bd
Author: Jesse Vincent <jesse at bestpractical.com>
Date:   Thu May 14 20:48:15 2009 -0400

    finish making the rt encoder use the same foreign replica id scheme as trac. remove some duplicate code

diff --git a/lib/App/SD/ForeignReplica/PushEncoder.pm b/lib/App/SD/ForeignReplica/PushEncoder.pm
index c80d491..f10ad4a 100644
--- a/lib/App/SD/ForeignReplica/PushEncoder.pm
+++ b/lib/App/SD/ForeignReplica/PushEncoder.pm
@@ -1,8 +1,68 @@
 package App::SD::ForeignReplica::PushEncoder;
 use Any::Moose;
+use Params::Validate;
 
 
+sub integrate_change {
+    my $self = shift;
+    my ( $change, $changeset ) = validate_pos(
+        @_,
+        { isa => 'Prophet::Change' },
+        { isa => 'Prophet::ChangeSet' }
+    );
+    my ($id, $record);
 
+    # if the original_sequence_no of this changeset is <= 
+    # the last changeset our sync source for the original_sequence_no, we can skip it.
+    # XXX TODO - this logic should be at the changeset level, not the cahnge level, as it applies to all
+    # changes in the changeset
+    #
+    return
+        if $self->sync_source->app_handle->handle->last_changeset_from_source(
+                $changeset->original_source_uuid
+        ) >= $changeset->original_sequence_no;
+
+    my $before_integration = time();
+
+    eval {
+        if (    $change->record_type eq 'ticket'
+            and $change->change_type eq 'add_file' ) {
+            $id = $self->integrate_ticket_create( $change, $changeset );
+            $self->sync_source->record_remote_id_for_pushed_record(
+                uuid      => $change->record_uuid,
+                remote_id => $id);
+        }
+        elsif ( $change->record_type eq 'attachment'
+            and $change->change_type eq 'add_file') {
+            $id = $self->integrate_attachment( $change, $changeset );
+        }
+        elsif ( $change->record_type eq 'comment'
+            and $change->change_type eq 'add_file' ) {
+            $id = $self->integrate_comment( $change, $changeset );
+        }
+        elsif ( $change->record_type eq 'ticket' ) {
+            $id = $self->integrate_ticket_update( $change, $changeset );
+        }
+        else {
+            $self->sync_source->log('I have no idea what I am doing for '.$change->record_uuid);
+            return undef;
+        }
+
+        $self->sync_source->record_pushed_transactions(
+            start_time => $before_integration,
+            ticket    => $id,
+            changeset => $changeset);
+    };
+
+    if (my $err = $@) {
+        $self->sync_source->log("Push error: ".$err);
+    }
+
+    $self->after_integrate_change();
+    return $id;
+}
+
+sub after_integrate_change {}
 
 no Any::Moose;
 __PACKAGE__->meta->make_immutable;
diff --git a/lib/App/SD/Replica/rt.pm b/lib/App/SD/Replica/rt.pm
index 7d0afd2..107a7e4 100644
--- a/lib/App/SD/Replica/rt.pm
+++ b/lib/App/SD/Replica/rt.pm
@@ -48,11 +48,8 @@ sub BUILD {
     $self->rt->login( username => $username, password => $password );
 }
 
-
 sub foreign_username { return shift->rt_username(@_)}
 
-
-
 sub get_txn_list_by_date {
     my $self   = shift;
     my $ticket = shift;
@@ -71,17 +68,10 @@ sub get_txn_list_by_date {
     return @txns;
 }
 
-
-
 sub upstream_last_txn {
     my $self = shift;
-    return $self->fetch_local_metadata('last_txn_id');
-}
-
-sub record_upstream_last_txn {
-    my $self = shift;
-    my $id = shift;
-    return $self->store_local_metadata('last_txn_id' => $id);
+    my $uuid = shift;
+    return $self->app_handle->handle->last_changeset_from_source( $uuid);
 }
 
 =head2 uuid
diff --git a/lib/App/SD/Replica/rt/PullEncoder.pm b/lib/App/SD/Replica/rt/PullEncoder.pm
index 256fea9..4faff30 100644
--- a/lib/App/SD/Replica/rt/PullEncoder.pm
+++ b/lib/App/SD/Replica/rt/PullEncoder.pm
@@ -22,7 +22,6 @@ sub run {
 
 
     my @tickets = $self->find_matching_tickets( $self->sync_source->query );
-
     $self->sync_source->log("No tickets found.") if @tickets == 0;
 
     my $counter = 0;
@@ -37,18 +36,25 @@ sub run {
 
     local $| = 1;
 
-    for my $id (@tickets) {
+    for my $ticket (@tickets) {
         $counter++;
+
+        my $ticket_id = $ticket->{id};
+
         print $progress->report( "%30b %p Est: %E\r", $counter );
+    
+        $self->sync_source->log( "Fetching ticket $ticket_id - $counter of " . scalar @tickets );
 
-        $self->sync_source->log( "Fetching ticket $id - $counter of " . scalar @tickets );
+        my $final_state= $self->_translate_final_ticket_state( $ticket);
+        my @transactions = $self->find_matching_transactions( 
+                ticket               => $ticket, 
+                starting_transaction => $self->sync_source->app_handle->handle->last_changeset_from_source( $self->sync_source->uuid_for_remote_id( $ticket_id )
 
-        my $ticket= $self->_translate_final_ticket_state( $self->sync_source->rt->show( type => 'ticket', id => $id ) );
-        my @transactions = @{ $self->find_matching_transactions( ticket               => $id, starting_transaction => ( ( $args{'after'} + 1 ) || 1 )) };
+            
+                                          )|| 1 ) ;
 
         my $txn_counter = 0;
         for my $txn ( sort { $b->{'id'} <=> $a->{'id'} } @transactions ) {
-
             my $created = App::SD::Util::string_to_datetime( $txn->{Created} );
 
             $last_modified = $created     if ( !$last_modified || ( $created > $last_modified ) );
@@ -56,7 +62,7 @@ sub run {
 
             $txn_counter++;
             $self->sync_source->log( "Transcoding transaction  @{[$txn->{'id'}]} - $txn_counter of " . scalar @transactions );
-            my $changeset = $self->transcode_one_txn( $txn, $ticket );
+            my $changeset = $self->transcode_one_txn( $txn, $final_state );
             $changeset->created( $txn->{'Created'} );
             next unless $changeset->has_changes;
             unshift @changesets, $changeset;
@@ -70,7 +76,7 @@ sub run {
     }
 
     $self->sync_source->record_upstream_last_modified_date($last_modified) if ( ( $last_modified ? $last_modified->epoch : 0 ) > ( $previously_modified ? $previously_modified->epoch : 0 ) );
-    $self->sync_source->record_upstream_last_txn($last_txn) if ( ( $last_txn || 0 ) > ( $self->sync_source->upstream_last_txn || 0 ) );
+#    $self->sync_source->record_upstream_last_txn($last_txn) if ( ( $last_txn || 0 ) > ( $self->sync_source->upstream_last_txn || 0 ) );
 }
 
 sub _translate_final_ticket_state {
@@ -130,15 +136,18 @@ Returns an RT::Client ticket collection for all tickets found matching your QUER
 
 sub find_matching_tickets {
     my $self = shift;
-    my ($query) = validate_pos(@_, 1);
+    my ($query) = validate_pos( @_, 1 );
 
     # If we've ever synced, we can limit our search to only newer things
-    if (my $before = $self->_only_pull_tickets_modified_after) {
-        $query = "($query) AND LastUpdated >= '".$before->ymd('-'). " " .$before->hms(':') ."'";
-        $self->sync_source->log("Skipping all tickets not updated since ".$before->iso8601);
+    if ( my $before = $self->_only_pull_tickets_modified_after ) {
+       $query = "($query) AND LastUpdated >= '" . $before->ymd('-') . " " . $before->hms(':') . "'";
+        $self->sync_source->log( "Skipping all tickets not updated since " . $before->iso8601 );
     }
-
-    return $self->sync_source->rt->search( type => 'ticket', query => $query );
+    return map {
+        my $hash = $self->sync_source->rt->show( type => 'ticket', id => $_ );
+        $hash->{id} =~ s|^ticket/||g;
+        $hash
+    } $self->sync_source->rt->search( type => 'ticket', query => $query );
 }
 
 
@@ -154,27 +163,24 @@ sub find_matching_transactions {
     my @txns;
 
     my $rt_handle = $self->sync_source->rt;
+    
+    my $ticket_id = $args{ticket}->{$self->sync_source->uuid . '-id'};
 
-     my $latest = $self->sync_source->upstream_last_txn() || 0;
-    for my $txn ( sort $rt_handle->get_transaction_ids( parent_id => $args{'ticket'} ) ) {
+     my $latest = $self->sync_source->upstream_last_txn($self->sync_source->uuid_for_remote_id( $ticket_id )) || 0;
+    for my $txn ( sort $rt_handle->get_transaction_ids( parent_id => $ticket_id)) {
         # Skip things calling code told us to skip
         next if $txn < $args{'starting_transaction'}; 
         # skip things we had on our last pull
         next if $txn <=  $latest;
-
         # Skip things we've pushed
-        next if $self->sync_source->foreign_transaction_originated_locally($txn, $args{'ticket'});
+        next if $self->sync_source->foreign_transaction_originated_locally($txn, $ticket_id);
 
-        my $txn_hash = $rt_handle->get_transaction(
-            parent_id => $args{'ticket'},
-            id        => $txn,
-            type      => 'ticket'
-        );
+        my $txn_hash = $rt_handle->get_transaction( parent_id => $ticket_id, id        => $txn, type      => 'ticket');
         if ( my $attachments = delete $txn_hash->{'Attachments'} ) {
             for my $attach ( split( /\n/, $attachments ) ) {
                 next unless ( $attach =~ /^(\d+):/ );
                 my $id = $1;
-                my $a  = $rt_handle->get_attachment( parent_id => $args{'ticket'}, id        => $id);
+                my $a  = $rt_handle->get_attachment( parent_id => $ticket_id, id        => $id);
 
                 push( @{ $txn_hash->{_attachments} }, $a ) if ( $a->{Filename} );
 
@@ -183,7 +189,7 @@ sub find_matching_transactions {
         }
         push @txns, $txn_hash;
     }
-    return \@txns;
+    return @txns;
 }
 
 sub transcode_one_txn {
@@ -194,7 +200,7 @@ sub transcode_one_txn {
         die "Transaction type $txn->{Type} (for transaction $txn->{id}) not implemented yet";
     }
     my $changeset = Prophet::ChangeSet->new(
-        {   original_source_uuid => $self->sync_source->uuid,
+        {   original_source_uuid => $self->sync_source->uuid_for_remote_id( $ticket->{ $self->sync_source->uuid . '-id' } ),
             original_sequence_no => $txn->{'id'},
             creator => $self->resolve_user_id_to( email_address => $txn->{'Creator'} ),
         }
@@ -203,8 +209,7 @@ sub transcode_one_txn {
     if ( $txn->{'Ticket'} ne $ticket->{$self->sync_source->uuid . '-id'}
         && $txn->{'Type'} !~ /^(?:Comment|Correspond)$/
     ) {
-        warn "Skipping a data change from a merged ticket" . $txn->{'Ticket'}
-            .' vs '. $ticket->{$self->sync_source->uuid . '-id'};
+        warn "Skipping a data change from a merged ticket" . $txn->{'Ticket'} .' vs '. $ticket->{$self->sync_source->uuid . '-id'};
         next;
     }
 
diff --git a/lib/App/SD/Replica/rt/PushEncoder.pm b/lib/App/SD/Replica/rt/PushEncoder.pm
index 15649d2..6694a1b 100644
--- a/lib/App/SD/Replica/rt/PushEncoder.pm
+++ b/lib/App/SD/Replica/rt/PushEncoder.pm
@@ -1,67 +1,15 @@
 package App::SD::Replica::rt::PushEncoder;
 use Any::Moose; 
+
+extends 'App::SD::ForeignReplica::PushEncoder';
+
 use Params::Validate;
 use Path::Class;
+
 has sync_source => 
     ( isa => 'App::SD::Replica::rt',
       is => 'rw');
 
-
-sub integrate_change {
-    my $self = shift;
-    my ( $change, $changeset ) = validate_pos(
-        @_,
-        { isa => 'Prophet::Change' },
-        { isa => 'Prophet::ChangeSet' }
-    );
-    my $id;
-    local $@;
-
-    my $before_integration = time();
-
-    eval {
-        if (    $change->record_type eq 'ticket'
-            and $change->change_type eq 'add_file' )
-        {
-            $id = $self->integrate_ticket_create( $change, $changeset );
-            $self->sync_source->record_remote_id_for_pushed_record(
-                uuid      => $change->record_uuid,
-                remote_id => $id
-            );
-
-        }
-        elsif (
-                $change->record_type eq 'attachment'
-            and $change->change_type eq 'add_file'
-
-          )
-        {
-            $id = $self->integrate_attachment( $change, $changeset );
-        }
-        elsif ( $change->record_type eq 'comment'
-            and $change->change_type eq 'add_file' )
-        {
-            $id = $self->integrate_comment( $change, $changeset );
-        }
-        elsif ( $change->record_type eq 'ticket' ) {
-            $id = $self->integrate_ticket_update( $change, $changeset );
-
-        }
-        else {
-            return undef;
-        }
-
-        $self->sync_source->record_pushed_transactions(
-            ticket    => $id,
-            start_time => $before_integration,
-            changeset => $changeset
-        );
-
-    };
-    warn $@ if $@;
-    return $id;
-}
-
 sub integrate_ticket_update {
     my $self = shift;
     my ( $change, $changeset ) = validate_pos(
@@ -71,8 +19,7 @@ sub integrate_ticket_update {
     );
 
     # Figure out the remote site's ticket ID for this change's record
-    my $remote_ticket_id =
-      $self->sync_source->remote_id_for_uuid( $change->record_uuid );
+    my $remote_ticket_id = $self->sync_source->remote_id_for_uuid( $change->record_uuid );
     my $ticket = RT::Client::REST::Ticket->new(
         rt => $self->sync_source->rt,
         id => $remote_ticket_id,
diff --git a/lib/App/SD/Replica/trac/PushEncoder.pm b/lib/App/SD/Replica/trac/PushEncoder.pm
index 7621715..a3f2727 100644
--- a/lib/App/SD/Replica/trac/PushEncoder.pm
+++ b/lib/App/SD/Replica/trac/PushEncoder.pm
@@ -3,68 +3,15 @@ use Any::Moose;
 use Params::Validate;
 use Path::Class;
 use Time::HiRes qw/usleep/;
-
 has sync_source => 
     ( isa => 'App::SD::Replica::trac',
       is => 'rw');
 
-sub integrate_change {
-    my $self = shift;
-    my ( $change, $changeset ) = validate_pos(
-        @_,
-        { isa => 'Prophet::Change' },
-        { isa => 'Prophet::ChangeSet' }
-    );
-    my ($id, $record);
-
-    # if the original_sequence_no of this changeset is <= 
-    # the last changeset our sync source for the original_sequence_no, we can skip it.
-    # XXX TODO - this logic should be at the changeset level, not the cahnge level, as it applies to all
-    # changes in the changeset
-    return
-        if $self->sync_source->app_handle->handle->last_changeset_from_source(
-                $changeset->original_source_uuid
-        ) >= $changeset->original_sequence_no;
-
-    my $before_integration = time();
-
-    eval {
-        if (    $change->record_type eq 'ticket'
-            and $change->change_type eq 'add_file' ) {
-            $id = $self->integrate_ticket_create( $change, $changeset );
-            $self->sync_source->record_remote_id_for_pushed_record(
-                uuid      => $change->record_uuid,
-                remote_id => $id);
-        }
-        elsif ( $change->record_type eq 'attachment'
-            and $change->change_type eq 'add_file') {
-            $id = $self->integrate_attachment( $change, $changeset );
-        }
-        elsif ( $change->record_type eq 'comment'
-            and $change->change_type eq 'add_file' ) {
-            $id = $self->integrate_comment( $change, $changeset );
-        }
-        elsif ( $change->record_type eq 'ticket' ) {
-            $id = $self->integrate_ticket_update( $change, $changeset );
-        }
-        else {
-            $self->sync_source->log('I have no idea what I am doing for '.$change->record_uuid);
-            return undef;
-        }
-
-        $self->sync_source->record_pushed_transactions(
-            start_time => $before_integration,
-            ticket    => $id,
-            changeset => $changeset);
-    };
-
-    if (my $err = $@) {
-        $self->sync_source->log("Push error: ".$err);
-    }
+extends 'App::SD::ForeignReplica::PushEncoder';
 
-    usleep(1100); # trac only accepts one ticket update per second. Yes. 
 
-    return $id;
+sub after_integrate_change {
+  usleep(1100); # trac only accepts one ticket update per second. Yes. 
 }
 
 sub integrate_ticket_update {
diff --git a/t/sd-rt/basic.t b/t/sd-rt/basic.t
index 6664aff..4bc3d56 100644
--- a/t/sd-rt/basic.t
+++ b/t/sd-rt/basic.t
@@ -67,7 +67,7 @@ RT::Client::REST::Ticket->new(
 )->store();
 
 ( $ret, $out, $err ) = run_script( 'sd', [ 'pull', '--from', $sd_rt_url ] );
-
+diag($err);
 
 run_output_matches(
     'sd',
@@ -112,14 +112,19 @@ RT::Client::REST::Ticket->new(
     id     => $ticket->id,
     status => 'stalled',
 )->store();
-
+diag("Making ".$ticket->id." stalled");
 ( $ret, $out, $err ) = run_script( 'sd', [ 'pull', '--from', $sd_rt_url ] );
-
+diag($out);
+diag($err);
 run_output_matches_unordered(
     'sd',
     [ 'ticket',              'list', '--regex', '.' ],
     [ "$yatta_id YATTA new", "$flyman_id Fly Man stalled", ]
 );
+( $ret, $out, $err ) = run_script( 'sd', [ 'ticket' ,'list', '--regex', '.']);
+
+diag($out);
+diag($err); 
 
 RT::Client::REST::Ticket->new(
     rt     => $rt,

-----------------------------------------------------------------------



More information about the Bps-public-commit mailing list