jesse at bestpractical.com jesse at bestpractical.com
Wed Jul 9 15:42:05 EDT 2008

Author: jesse
Date: Wed Jul  9 15:41:56 2008
New Revision: 13924


* Extracting the push and pull specific bits of logic into separate classes

Modified: sd/trunk/lib/App/SD/Replica/RT.pm
--- sd/trunk/lib/App/SD/Replica/RT.pm	(original)
+++ sd/trunk/lib/App/SD/Replica/RT.pm	Wed Jul  9 15:41:56 2008
@@ -17,58 +17,6 @@
 has rt_queue => ( isa => 'Str', is => 'rw');
 has rt_query => ( isa => 'Str', is => 'rw');
-If the remote storage (RT) can not represent a whole changeset along with the prophet changeset uuid, then we need to 
-create a seperate locally(?) stored map of:
-    remote-subchangeset-identifier to changeset uuid.
-    remote id to prophet record uuid
-For each sync of the same remote source (RT), we need a unique prophet database domain.
-if clkao syncs from RT, jesse can sync with clkao but not with RT directly with the same database.
-Push to rt algorithm
-apply a single changeset that's part of the push:
-    - for each record in that changeset:
-        - pull the record's txn list from the server
-        - for each txn we don't know we've already seen, look at it
-            - if it is from the changeset we just pushed, then
-                store the id of the new transaction and originating uuid in the push-ticket store.
-                    - does that let us specify individual txns? or is it a high-water mark?
-             - if it is _not_ from the changeset we just pushed, then 
-                do we just ignore it?
-                how do we mark an out-of-order transaction as not-pulled?
-Changesets we want to push from SD to RT and how they map
-what do we do with cfs rt doesn't know about?
-=head2 setup
 # XXX: this should be called from superclass, or better, have individual attributes have their own builders.
 around 'new' => sub {
@@ -252,220 +200,83 @@
         { isa => 'Prophet::Change' },
         { isa => 'Prophet::ChangeSet' }
-    my $id;
-    eval {
-        if (    $change->record_type eq 'ticket'
-            and $change->change_type eq 'add_file' 
-    )
-        {
-            $id = $self->integrate_ticket_create( $change, $changeset );
-            $self->record_pushed_ticket(
-                uuid      => $change->record_uuid,
-                remote_id => $id
-            );
-        } elsif ( $change->record_type eq 'attachment'
-            and $change->change_type eq 'add_file' 
-        ) {
-            $id = $self->integrate_attachment( $change, $changeset );
-        } elsif ( $change->record_type eq 'comment' 
-            and $change->change_type eq 'add_file' 
-        ) {
-            $id = $self->integrate_comment( $change, $changeset );
-        } elsif ( $change->record_type eq 'ticket' ) {
-            $id = $self->integrate_ticket_update( $change, $changeset );
+    require App::SD::Replica::RT::PushEncoder;
+    my $recoder = App::SD::Replica::RT::PushEncoder->new( { sync_source => $self } );
+    $recoder->integrate_change($change,$changeset);
-        } else {
-            return undef;
-        }
+=head2 uuid
-        $self->record_pushed_transactions(
-            ticket    => $id,
-            changeset => $changeset
-        );
+Return the replica SVN repository's UUID
-    };
-    warn $@ if $@;
-    return $id;
-sub integrate_ticket_update {
+sub uuid {
     my $self = shift;
-    my ( $change, $changeset ) = validate_pos(
-        @_,
-        { isa => 'Prophet::Change' },
-        { isa => 'Prophet::ChangeSet' }
-    );
-    # Figure out the remote site's ticket ID for this change's record
-    my $remote_ticket_id = $self->remote_id_for_uuid( $change->record_uuid );
-    my $ticket           = RT::Client::REST::Ticket->new(
-        rt => $self->rt,
-        id => $remote_ticket_id,
-        %{ $self->_recode_props_for_integrate($change) }
-    )->store();
+    return $self->uuid_for_url( join( '/', $self->rt_url, $self->rt_query ) );
-    return $remote_ticket_id;
-sub integrate_ticket_create {
+sub traverse_changesets {
     my $self = shift;
-    my ( $change, $changeset ) = validate_pos(
-        @_,
-        { isa => 'Prophet::Change' },
-        { isa => 'Prophet::ChangeSet' }
+    my %args = validate( @_,
+        {   after    => 1,
+            callback => 1,
+        }
-    # Build up a ticket object out of all the record's attributes
-    my $ticket = RT::Client::REST::Ticket->new(
-        rt    => $self->rt,
-        queue => $self->rt_queue(),
-        %{ $self->_recode_props_for_integrate($change) }
-    )->store( text => "Not yet pulling in ticket creation comment" );
+    require App::SD::Replica::RT::PullEncoder;
+    my $recoder = App::SD::Replica::RT::PullEncoder->new( { sync_source => $self } );
+    $recoder->pull( query => $self->rt_query, after => $args{'after'}, callback => $args{'callback'});
-    return $ticket->id;
-sub integrate_comment {
-    my $self = shift;
-    my ($change, $changeset) = validate_pos( @_, { isa => 'Prophet::Change' }, {isa => 'Prophet::ChangeSet'} );
-    # Figure out the remote site's ticket ID for this change's record
-    my %props = map { $_->name => $_->new_value } $change->prop_changes;
+If the remote storage (RT) can not represent a whole changeset along with the prophet changeset uuid, then we need to 
+create a seperate locally(?) stored map of:
+    remote-subchangeset-identifier to changeset uuid.
+    remote id to prophet record uuid
-    my $ticket_id     = $self->remote_id_for_uuid( $props{'ticket'} );
-    my $ticket = RT::Client::REST::Ticket->new( rt => $self->rt, id => $ticket_id);
+For each sync of the same remote source (RT), we need a unique prophet database domain.
-    my %content = ( message => $props{'content'},   
-                );
+if clkao syncs from RT, jesse can sync with clkao but not with RT directly with the same database.
-    if (  ($props{'type'} ||'') eq 'comment' ) {
-        $ticket->comment( %content);
-    } else {
-        $ticket->correspond(%content);
-    }
-    return $ticket_id;
-sub integrate_attachment {
-    my ($self, $change, $changeset ) = validate_pos( @_, { isa => 'App::SD::Replica::RT'}, { isa => 'Prophet::Change' }, { isa => 'Prophet::ChangeSet' });
+Push to rt algorithm
+apply a single changeset that's part of the push:
+    - for each record in that changeset:
+        - pull the record's txn list from the server
+        - for each txn we don't know we've already seen, look at it
+            - if it is from the changeset we just pushed, then
+                store the id of the new transaction and originating uuid in the push-ticket store.
+                    - does that let us specify individual txns? or is it a high-water mark?
+             - if it is _not_ from the changeset we just pushed, then 
+                do we just ignore it?
+                how do we mark an out-of-order transaction as not-pulled?
-    my %props = map { $_->name => $_->new_value } $change->prop_changes;
-    my $ticket_id = $self->remote_id_for_uuid( $props{'ticket'});
-    my $ticket = RT::Client::REST::Ticket->new( rt => $self->rt, id => $ticket_id );
-    my $tempdir = File::Temp::tempdir( CLEANUP => 1 );
-    my $file = file( $tempdir => ( $props{'name'} || 'unnamed' ) );
-    my $fh = $file->openw;
-    print $fh $props{content};
-    close $fh;
-    my %content = ( message     => '(See attachments)', attachments => ["$file"]);
-    $ticket->correspond(%content);
-    return $ticket_id;
+Changesets we want to push from SD to RT and how they map
+what do we do with cfs rt doesn't know about?
-sub _recode_props_for_integrate {
-    my $self = shift;
-    my ($change) = validate_pos( @_, { isa => 'Prophet::Change' } );
-    my %props = map { $_->name => $_->new_value } $change->prop_changes;
-    my %attr;
-    for my $key ( keys %props ) {
-        next unless ( $key =~ /^(summary|queue|status|owner|custom)/ );
-        if ( $key =~ /^custom-(.*)/ ) {
-            $attr{cf}->{$1} = $props{$key};
-        } elsif ( $key eq 'summary' ) {
-            $attr{'subject'} = $props{summary};
-        } else {
-            $attr{$key} = $props{$key};
-        }
-        if ( $key eq 'status' ) {
-            $attr{$key} =~ s/^closed$/resolved/;
-        }
-    }
-    return \%attr;
-=head2 uuid
-Return the replica SVN repository's UUID
-sub uuid {
-    my $self = shift;
-    return $self->uuid_for_url( join( '/', $self->rt_url, $self->rt_query ) );
-sub traverse_changesets {
-    my $self = shift;
-    my %args = validate(
-        @_,
-        {   after    => 1,
-            callback => 1,
-        }
-    );
-    my $first_rev = ( $args{'after'} + 1 ) || 1;
-    require App::SD::Replica::RT::PullEncoder;
-    my $recoder
-        = App::SD::Replica::RT::PullEncoder->new( { sync_source => $self } );
-    for my $id ( $self->find_matching_tickets ) {
-        # XXX: _recode_transactions should ignore txn-id <= $first_rev
-        $args{callback}->($_)
-            for @{
-            $recoder->run(
-                ticket => $self->rt->show( type => 'ticket', id => $id ),
-                transactions => $self->find_matching_transactions(
-                    ticket               => $id,
-                    starting_transaction => $first_rev
-                ),
-            )
-            };
-    }
-sub find_matching_tickets {
-    my $self = shift;
-    return $self->rt->search( type => 'ticket', query => $self->rt_query );
-sub find_matching_transactions {
-    my $self = shift;
-    my %args = validate( @_, { ticket => 1, starting_transaction => 1 } );
-    my @txns;
-    for my $txn ( sort $self->rt->get_transaction_ids( parent_id => $args{ticket} ) ) {
-        next if $txn < $args{'starting_transaction'}; # Skip things we've pushed
-        next if $self->prophet_has_seen_transaction($txn);
-        my $txn_hash = $self->rt->get_transaction(
-            parent_id => $args{ticket},
-            id        => $txn,
-            type      => 'ticket'
-        );
-        if ( my $attachments = delete $txn_hash->{'Attachments'} ) {
-            foreach my $attach ( split( /\n/, $attachments ) ) {
-                next unless ( $attach =~ /^(\d+):/ );
-                my $id = $1;
-                my $a  = $self->rt->get_attachment( parent_id => $args{'ticket'}, id        => $id);
-                push( @{ $txn_hash->{_attachments} }, $a )
-                    if ( $a->{Filename} );
+=head2 setup
-            }
-        }
-        push @txns, $txn_hash;
-    }
-    return \@txns;

Modified: sd/trunk/lib/App/SD/Replica/RT/PullEncoder.pm
--- sd/trunk/lib/App/SD/Replica/RT/PullEncoder.pm	(original)
+++ sd/trunk/lib/App/SD/Replica/RT/PullEncoder.pm	Wed Jul  9 15:41:56 2008
@@ -4,7 +4,6 @@
 package App::SD::Replica::RT::PullEncoder;
 use Moose;
-use base qw/Class::Accessor/;
 use Params::Validate qw(:all);
 use UNIVERSAL::require;
@@ -17,6 +16,75 @@
 our $DEBUG = $Prophet::Handle::DEBUG;
+sub pull {
+    my $self = shift;
+    my %args = validate( @_,
+        {   after    => 1,
+            callback => 1,
+            query => 1
+        }
+    );
+    my $first_rev = ( $args{'after'} + 1 ) || 1;
+    for my $id ( $self->find_matching_tickets($args{'query'} )) {
+        # XXX: _recode_transactions should ignore txn-id <= $first_rev
+        $args{callback}->($_)
+            for @{
+            $self->run(
+                ticket => $self->sync_source->rt->show( type => 'ticket', id => $id ),
+                transactions => $self->find_matching_transactions(
+                    ticket               => $id,
+                    starting_transaction => $first_rev
+                ),
+            )
+            };
+    }
+sub find_matching_tickets {
+    my $self = shift;
+    my ($query) = validate_pos(@_, 1);
+    return $self->sync_source->rt->search( type => 'ticket', query => $query );
+sub find_matching_transactions {
+    my $self = shift;
+    my %args = validate( @_, { ticket => 1, starting_transaction => 1 } );
+    my @txns;
+    my $rt_handle = $self->sync_source->rt;
+    for my $txn ( sort $rt_handle->get_transaction_ids( parent_id => $args{ticket} ) ) {
+        next if $txn < $args{'starting_transaction'}; # Skip things we've pushed
+        next if $self->sync_source->prophet_has_seen_transaction($txn);
+        my $txn_hash = $rt_handle->get_transaction(
+            parent_id => $args{ticket},
+            id        => $txn,
+            type      => 'ticket'
+        );
+        if ( my $attachments = delete $txn_hash->{'Attachments'} ) {
+            foreach my $attach ( split( /\n/, $attachments ) ) {
+                next unless ( $attach =~ /^(\d+):/ );
+                my $id = $1;
+                my $a  = $rt_handle->get_attachment( parent_id => $args{'ticket'}, id        => $id);
+                push( @{ $txn_hash->{_attachments} }, $a )
+                    if ( $a->{Filename} );
+            }
+        }
+        push @txns, $txn_hash;
+    }
+    return \@txns;
 sub run {
     my $self = shift;
     my %args = validate( @_, { ticket => 1, transactions => 1, attachments => 0 } );

Added: sd/trunk/lib/App/SD/Replica/RT/PushEncoder.pm
--- (empty file)
+++ sd/trunk/lib/App/SD/Replica/RT/PushEncoder.pm	Wed Jul  9 15:41:56 2008
@@ -0,0 +1,155 @@
+package App::SD::Replica::RT::PushEncoder;
+use Moose; 
+use Params::Validate;
+use Path::Class;
+has sync_source => 
+    ( isa => 'App::SD::Replica::RT',
+      is => 'rw');
+sub integrate_change {
+    my $self = shift;
+    my ( $change, $changeset ) = validate_pos(
+        @_,
+        { isa => 'Prophet::Change' },
+        { isa => 'Prophet::ChangeSet' }
+    );
+    my $id;
+    eval {
+        if (    $change->record_type eq 'ticket'
+            and $change->change_type eq 'add_file' 
+    )
+        {
+            $id = $self->integrate_ticket_create( $change, $changeset );
+            $self->sync_source->record_pushed_ticket(
+                uuid      => $change->record_uuid,
+                remote_id => $id
+            );
+        } elsif ( $change->record_type eq 'attachment'
+            and $change->change_type eq 'add_file' 
+        ) {
+            $id = $self->integrate_attachment( $change, $changeset );
+        } elsif ( $change->record_type eq 'comment' 
+            and $change->change_type eq 'add_file' 
+        ) {
+            $id = $self->integrate_comment( $change, $changeset );
+        } elsif ( $change->record_type eq 'ticket' ) {
+            $id = $self->integrate_ticket_update( $change, $changeset );
+        } else {
+            return undef;
+        }
+        $self->sync_source->record_pushed_transactions(
+            ticket    => $id,
+            changeset => $changeset
+        );
+    };
+    warn $@ if $@;
+    return $id;
+sub integrate_ticket_update {
+    my $self = shift;
+    my ( $change, $changeset ) = validate_pos(
+        @_,
+        { isa => 'Prophet::Change' },
+        { isa => 'Prophet::ChangeSet' }
+    );
+    # Figure out the remote site's ticket ID for this change's record
+    my $remote_ticket_id = $self->sync_source->remote_id_for_uuid( $change->record_uuid );
+    my $ticket           = RT::Client::REST::Ticket->new(
+        rt => $self->sync_source->rt,
+        id => $remote_ticket_id,
+        %{ $self->_recode_props_for_integrate($change) }
+    )->store();
+    return $remote_ticket_id;
+sub integrate_ticket_create {
+    my $self = shift;
+    my ( $change, $changeset ) = validate_pos(
+        @_,
+        { isa => 'Prophet::Change' },
+        { isa => 'Prophet::ChangeSet' }
+    );
+    # Build up a ticket object out of all the record's attributes
+    my $ticket = RT::Client::REST::Ticket->new(
+        rt    => $self->sync_source->rt,
+        queue => $self->sync_source->rt_queue(),
+        %{ $self->_recode_props_for_integrate($change) }
+    )->store( text => "Not yet pulling in ticket creation comment" );
+    return $ticket->id;
+sub integrate_comment {
+    my $self = shift;
+    my ($change, $changeset) = validate_pos( @_, { isa => 'Prophet::Change' }, {isa => 'Prophet::ChangeSet'} );
+    # Figure out the remote site's ticket ID for this change's record
+    my %props = map { $_->name => $_->new_value } $change->prop_changes;
+    my $ticket_id     = $self->sync_source->remote_id_for_uuid( $props{'ticket'} );
+    my $ticket = RT::Client::REST::Ticket->new( rt => $self->sync_source->rt, id => $ticket_id);
+    my %content = ( message => $props{'content'},   
+                );
+    if (  ($props{'type'} ||'') eq 'comment' ) {
+        $ticket->comment( %content);
+    } else {
+        $ticket->correspond(%content);
+    }
+    return $ticket_id;
+sub integrate_attachment {
+    my ($self, $change, $changeset ) = validate_pos( @_, { isa => 'App::SD::Replica::RT::PushEncoder'}, { isa => 'Prophet::Change' }, { isa => 'Prophet::ChangeSet' });
+    my %props = map { $_->name => $_->new_value } $change->prop_changes;
+    my $ticket_id = $self->sync_source->remote_id_for_uuid( $props{'ticket'});
+    my $ticket = RT::Client::REST::Ticket->new( rt => $self->sync_source->rt, id => $ticket_id );
+    my $tempdir = File::Temp::tempdir( CLEANUP => 1 );
+    my $file = file( $tempdir => ( $props{'name'} || 'unnamed' ) );
+    my $fh = $file->openw;
+    print $fh $props{content};
+    close $fh;
+    my %content = ( message     => '(See attachments)', attachments => ["$file"]);
+    $ticket->correspond(%content);
+    return $ticket_id;
+sub _recode_props_for_integrate {
+    my $self = shift;
+    my ($change) = validate_pos( @_, { isa => 'Prophet::Change' } );
+    my %props = map { $_->name => $_->new_value } $change->prop_changes;
+    my %attr;
+    for my $key ( keys %props ) {
+        next unless ( $key =~ /^(summary|queue|status|owner|custom)/ );
+        if ( $key =~ /^custom-(.*)/ ) {
+            $attr{cf}->{$1} = $props{$key};
+        } elsif ( $key eq 'summary' ) {
+            $attr{'subject'} = $props{summary};
+        } else {
+            $attr{$key} = $props{$key};
+        }
+        if ( $key eq 'status' ) {
+            $attr{$key} =~ s/^closed$/resolved/;
+        }
+    }
+    return \%attr;

