[Bps-public-commit] r13924 - in sd/trunk/lib/App/SD/Replica: RT
jesse at bestpractical.com
jesse at bestpractical.com
Wed Jul 9 15:42:05 EDT 2008
Author: jesse
Date: Wed Jul 9 15:41:56 2008
New Revision: 13924
Added:
sd/trunk/lib/App/SD/Replica/RT/PushEncoder.pm
Modified:
sd/trunk/lib/App/SD/Replica/RT.pm
sd/trunk/lib/App/SD/Replica/RT/PullEncoder.pm
Log:
* Extracting the push and pull specific bits of logic into separate classes
Modified: sd/trunk/lib/App/SD/Replica/RT.pm
==============================================================================
--- sd/trunk/lib/App/SD/Replica/RT.pm (original)
+++ sd/trunk/lib/App/SD/Replica/RT.pm Wed Jul 9 15:41:56 2008
@@ -17,58 +17,6 @@
has rt_queue => ( isa => 'Str', is => 'rw');
has rt_query => ( isa => 'Str', is => 'rw');
-=head1 NOTES ON PUSH
-
-If the remote storage (RT) can not represent a whole changeset along with the prophet changeset uuid, then we need to
-create a seperate locally(?) stored map of:
- remote-subchangeset-identifier to changeset uuid.
- remote id to prophet record uuid
-
-
-For each sync of the same remote source (RT), we need a unique prophet database domain.
-
-if clkao syncs from RT, jesse can sync with clkao but not with RT directly with the same database.
-
-
-
-
-
-
-
-
-Push to rt algorithm
-
-apply a single changeset that's part of the push:
- - for each record in that changeset:
- - pull the record's txn list from the server
- - for each txn we don't know we've already seen, look at it
- - if it is from the changeset we just pushed, then
- store the id of the new transaction and originating uuid in the push-ticket store.
- - does that let us specify individual txns? or is it a high-water mark?
- - if it is _not_ from the changeset we just pushed, then
- do we just ignore it?
- how do we mark an out-of-order transaction as not-pulled?
-
-
-
-Changesets we want to push from SD to RT and how they map
-
-
-what do we do with cfs rt doesn't know about?
-
-
-
-SD::Source::RT->recode_ticket
-
-
-
-=cut
-
-=head2 setup
-
-
-=cut
-
# XXX: this should be called from superclass, or better, have individual attributes have their own builders.
around 'new' => sub {
@@ -252,220 +200,83 @@
{ isa => 'Prophet::Change' },
{ isa => 'Prophet::ChangeSet' }
);
- my $id;
- eval {
- if ( $change->record_type eq 'ticket'
- and $change->change_type eq 'add_file'
- )
- {
- $id = $self->integrate_ticket_create( $change, $changeset );
- $self->record_pushed_ticket(
- uuid => $change->record_uuid,
- remote_id => $id
- );
- } elsif ( $change->record_type eq 'attachment'
- and $change->change_type eq 'add_file'
-
- ) {
- $id = $self->integrate_attachment( $change, $changeset );
- } elsif ( $change->record_type eq 'comment'
- and $change->change_type eq 'add_file'
- ) {
- $id = $self->integrate_comment( $change, $changeset );
- } elsif ( $change->record_type eq 'ticket' ) {
- $id = $self->integrate_ticket_update( $change, $changeset );
+ require App::SD::Replica::RT::PushEncoder;
+ my $recoder = App::SD::Replica::RT::PushEncoder->new( { sync_source => $self } );
+ $recoder->integrate_change($change,$changeset);
+}
- } else {
- return undef;
- }
+=head2 uuid
- $self->record_pushed_transactions(
- ticket => $id,
- changeset => $changeset
- );
+Return the replica SVN repository's UUID
- };
- warn $@ if $@;
- return $id;
-}
+=cut
-sub integrate_ticket_update {
+sub uuid {
my $self = shift;
- my ( $change, $changeset ) = validate_pos(
- @_,
- { isa => 'Prophet::Change' },
- { isa => 'Prophet::ChangeSet' }
- );
-
- # Figure out the remote site's ticket ID for this change's record
- my $remote_ticket_id = $self->remote_id_for_uuid( $change->record_uuid );
- my $ticket = RT::Client::REST::Ticket->new(
- rt => $self->rt,
- id => $remote_ticket_id,
- %{ $self->_recode_props_for_integrate($change) }
- )->store();
+ return $self->uuid_for_url( join( '/', $self->rt_url, $self->rt_query ) );
- return $remote_ticket_id;
}
-sub integrate_ticket_create {
+sub traverse_changesets {
my $self = shift;
- my ( $change, $changeset ) = validate_pos(
- @_,
- { isa => 'Prophet::Change' },
- { isa => 'Prophet::ChangeSet' }
+ my %args = validate( @_,
+ { after => 1,
+ callback => 1,
+ }
);
- # Build up a ticket object out of all the record's attributes
- my $ticket = RT::Client::REST::Ticket->new(
- rt => $self->rt,
- queue => $self->rt_queue(),
- %{ $self->_recode_props_for_integrate($change) }
- )->store( text => "Not yet pulling in ticket creation comment" );
+ require App::SD::Replica::RT::PullEncoder;
+ my $recoder = App::SD::Replica::RT::PullEncoder->new( { sync_source => $self } );
+ $recoder->pull( query => $self->rt_query, after => $args{'after'}, callback => $args{'callback'});
- return $ticket->id;
}
-sub integrate_comment {
- my $self = shift;
- my ($change, $changeset) = validate_pos( @_, { isa => 'Prophet::Change' }, {isa => 'Prophet::ChangeSet'} );
- # Figure out the remote site's ticket ID for this change's record
+=head1 NOTES ON PUSH
- my %props = map { $_->name => $_->new_value } $change->prop_changes;
+If the remote storage (RT) can not represent a whole changeset along with the prophet changeset uuid, then we need to
+create a seperate locally(?) stored map of:
+ remote-subchangeset-identifier to changeset uuid.
+ remote id to prophet record uuid
+
- my $ticket_id = $self->remote_id_for_uuid( $props{'ticket'} );
- my $ticket = RT::Client::REST::Ticket->new( rt => $self->rt, id => $ticket_id);
+For each sync of the same remote source (RT), we need a unique prophet database domain.
- my %content = ( message => $props{'content'},
- );
+if clkao syncs from RT, jesse can sync with clkao but not with RT directly with the same database.
- if ( ($props{'type'} ||'') eq 'comment' ) {
- $ticket->comment( %content);
- } else {
- $ticket->correspond(%content);
- }
- return $ticket_id;
-}
-sub integrate_attachment {
- my ($self, $change, $changeset ) = validate_pos( @_, { isa => 'App::SD::Replica::RT'}, { isa => 'Prophet::Change' }, { isa => 'Prophet::ChangeSet' });
+Push to rt algorithm
+apply a single changeset that's part of the push:
+ - for each record in that changeset:
+ - pull the record's txn list from the server
+ - for each txn we don't know we've already seen, look at it
+ - if it is from the changeset we just pushed, then
+ store the id of the new transaction and originating uuid in the push-ticket store.
+ - does that let us specify individual txns? or is it a high-water mark?
+ - if it is _not_ from the changeset we just pushed, then
+ do we just ignore it?
+ how do we mark an out-of-order transaction as not-pulled?
+
- my %props = map { $_->name => $_->new_value } $change->prop_changes;
- my $ticket_id = $self->remote_id_for_uuid( $props{'ticket'});
- my $ticket = RT::Client::REST::Ticket->new( rt => $self->rt, id => $ticket_id );
- my $tempdir = File::Temp::tempdir( CLEANUP => 1 );
- my $file = file( $tempdir => ( $props{'name'} || 'unnamed' ) );
- my $fh = $file->openw;
- print $fh $props{content};
- close $fh;
- my %content = ( message => '(See attachments)', attachments => ["$file"]);
- $ticket->correspond(%content);
- return $ticket_id;
-}
+Changesets we want to push from SD to RT and how they map
+
+what do we do with cfs rt doesn't know about?
-sub _recode_props_for_integrate {
- my $self = shift;
- my ($change) = validate_pos( @_, { isa => 'Prophet::Change' } );
- my %props = map { $_->name => $_->new_value } $change->prop_changes;
- my %attr;
- for my $key ( keys %props ) {
- next unless ( $key =~ /^(summary|queue|status|owner|custom)/ );
- if ( $key =~ /^custom-(.*)/ ) {
- $attr{cf}->{$1} = $props{$key};
- } elsif ( $key eq 'summary' ) {
- $attr{'subject'} = $props{summary};
- } else {
- $attr{$key} = $props{$key};
- }
- if ( $key eq 'status' ) {
- $attr{$key} =~ s/^closed$/resolved/;
- }
- }
- return \%attr;
-}
+SD::Source::RT->recode_ticket
-=head2 uuid
-Return the replica SVN repository's UUID
=cut
-sub uuid {
- my $self = shift;
- return $self->uuid_for_url( join( '/', $self->rt_url, $self->rt_query ) );
-
-}
-
-sub traverse_changesets {
- my $self = shift;
- my %args = validate(
- @_,
- { after => 1,
- callback => 1,
- }
- );
-
- my $first_rev = ( $args{'after'} + 1 ) || 1;
-
- require App::SD::Replica::RT::PullEncoder;
- my $recoder
- = App::SD::Replica::RT::PullEncoder->new( { sync_source => $self } );
- for my $id ( $self->find_matching_tickets ) {
-
- # XXX: _recode_transactions should ignore txn-id <= $first_rev
- $args{callback}->($_)
- for @{
- $recoder->run(
- ticket => $self->rt->show( type => 'ticket', id => $id ),
- transactions => $self->find_matching_transactions(
- ticket => $id,
- starting_transaction => $first_rev
- ),
-
- )
- };
- }
-}
-
-sub find_matching_tickets {
- my $self = shift;
- return $self->rt->search( type => 'ticket', query => $self->rt_query );
-}
-
-sub find_matching_transactions {
- my $self = shift;
- my %args = validate( @_, { ticket => 1, starting_transaction => 1 } );
- my @txns;
- for my $txn ( sort $self->rt->get_transaction_ids( parent_id => $args{ticket} ) ) {
- next if $txn < $args{'starting_transaction'}; # Skip things we've pushed
- next if $self->prophet_has_seen_transaction($txn);
- my $txn_hash = $self->rt->get_transaction(
- parent_id => $args{ticket},
- id => $txn,
- type => 'ticket'
- );
- if ( my $attachments = delete $txn_hash->{'Attachments'} ) {
- foreach my $attach ( split( /\n/, $attachments ) ) {
- next unless ( $attach =~ /^(\d+):/ );
- my $id = $1;
- my $a = $self->rt->get_attachment( parent_id => $args{'ticket'}, id => $id);
-
- push( @{ $txn_hash->{_attachments} }, $a )
- if ( $a->{Filename} );
+=head2 setup
- }
- }
- push @txns, $txn_hash;
- }
- return \@txns;
-}
+=cut
1;
Modified: sd/trunk/lib/App/SD/Replica/RT/PullEncoder.pm
==============================================================================
--- sd/trunk/lib/App/SD/Replica/RT/PullEncoder.pm (original)
+++ sd/trunk/lib/App/SD/Replica/RT/PullEncoder.pm Wed Jul 9 15:41:56 2008
@@ -4,7 +4,6 @@
package App::SD::Replica::RT::PullEncoder;
use Moose;
-use base qw/Class::Accessor/;
use Params::Validate qw(:all);
use UNIVERSAL::require;
@@ -17,6 +16,75 @@
our $DEBUG = $Prophet::Handle::DEBUG;
+
+
+sub pull {
+ my $self = shift;
+ my %args = validate( @_,
+ { after => 1,
+ callback => 1,
+ query => 1
+ }
+ );
+
+ my $first_rev = ( $args{'after'} + 1 ) || 1;
+
+ for my $id ( $self->find_matching_tickets($args{'query'} )) {
+ # XXX: _recode_transactions should ignore txn-id <= $first_rev
+ $args{callback}->($_)
+ for @{
+ $self->run(
+ ticket => $self->sync_source->rt->show( type => 'ticket', id => $id ),
+ transactions => $self->find_matching_transactions(
+ ticket => $id,
+ starting_transaction => $first_rev
+ ),
+
+ )
+ };
+ }
+}
+
+sub find_matching_tickets {
+ my $self = shift;
+ my ($query) = validate_pos(@_, 1);
+ return $self->sync_source->rt->search( type => 'ticket', query => $query );
+}
+
+sub find_matching_transactions {
+ my $self = shift;
+ my %args = validate( @_, { ticket => 1, starting_transaction => 1 } );
+ my @txns;
+
+ my $rt_handle = $self->sync_source->rt;
+
+ for my $txn ( sort $rt_handle->get_transaction_ids( parent_id => $args{ticket} ) ) {
+ next if $txn < $args{'starting_transaction'}; # Skip things we've pushed
+ next if $self->sync_source->prophet_has_seen_transaction($txn);
+ my $txn_hash = $rt_handle->get_transaction(
+ parent_id => $args{ticket},
+ id => $txn,
+ type => 'ticket'
+ );
+ if ( my $attachments = delete $txn_hash->{'Attachments'} ) {
+ foreach my $attach ( split( /\n/, $attachments ) ) {
+ next unless ( $attach =~ /^(\d+):/ );
+ my $id = $1;
+ my $a = $rt_handle->get_attachment( parent_id => $args{'ticket'}, id => $id);
+
+ push( @{ $txn_hash->{_attachments} }, $a )
+ if ( $a->{Filename} );
+
+ }
+
+ }
+ push @txns, $txn_hash;
+ }
+ return \@txns;
+}
+
+
+
sub run {
my $self = shift;
my %args = validate( @_, { ticket => 1, transactions => 1, attachments => 0 } );
Added: sd/trunk/lib/App/SD/Replica/RT/PushEncoder.pm
==============================================================================
--- (empty file)
+++ sd/trunk/lib/App/SD/Replica/RT/PushEncoder.pm Wed Jul 9 15:41:56 2008
@@ -0,0 +1,155 @@
+package App::SD::Replica::RT::PushEncoder;
+use Moose;
+use Params::Validate;
+use Path::Class;
+has sync_source =>
+ ( isa => 'App::SD::Replica::RT',
+ is => 'rw');
+
+
+sub integrate_change {
+ my $self = shift;
+ my ( $change, $changeset ) = validate_pos(
+ @_,
+ { isa => 'Prophet::Change' },
+ { isa => 'Prophet::ChangeSet' }
+ );
+ my $id;
+ eval {
+ if ( $change->record_type eq 'ticket'
+ and $change->change_type eq 'add_file'
+ )
+ {
+ $id = $self->integrate_ticket_create( $change, $changeset );
+ $self->sync_source->record_pushed_ticket(
+ uuid => $change->record_uuid,
+ remote_id => $id
+ );
+
+ } elsif ( $change->record_type eq 'attachment'
+ and $change->change_type eq 'add_file'
+
+ ) {
+ $id = $self->integrate_attachment( $change, $changeset );
+ } elsif ( $change->record_type eq 'comment'
+ and $change->change_type eq 'add_file'
+ ) {
+ $id = $self->integrate_comment( $change, $changeset );
+ } elsif ( $change->record_type eq 'ticket' ) {
+ $id = $self->integrate_ticket_update( $change, $changeset );
+
+ } else {
+ return undef;
+ }
+
+ $self->sync_source->record_pushed_transactions(
+ ticket => $id,
+ changeset => $changeset
+ );
+
+ };
+ warn $@ if $@;
+ return $id;
+}
+
+sub integrate_ticket_update {
+ my $self = shift;
+ my ( $change, $changeset ) = validate_pos(
+ @_,
+ { isa => 'Prophet::Change' },
+ { isa => 'Prophet::ChangeSet' }
+ );
+
+ # Figure out the remote site's ticket ID for this change's record
+ my $remote_ticket_id = $self->sync_source->remote_id_for_uuid( $change->record_uuid );
+ my $ticket = RT::Client::REST::Ticket->new(
+ rt => $self->sync_source->rt,
+ id => $remote_ticket_id,
+ %{ $self->_recode_props_for_integrate($change) }
+ )->store();
+
+ return $remote_ticket_id;
+}
+
+sub integrate_ticket_create {
+ my $self = shift;
+ my ( $change, $changeset ) = validate_pos(
+ @_,
+ { isa => 'Prophet::Change' },
+ { isa => 'Prophet::ChangeSet' }
+ );
+
+ # Build up a ticket object out of all the record's attributes
+ my $ticket = RT::Client::REST::Ticket->new(
+ rt => $self->sync_source->rt,
+ queue => $self->sync_source->rt_queue(),
+ %{ $self->_recode_props_for_integrate($change) }
+ )->store( text => "Not yet pulling in ticket creation comment" );
+
+ return $ticket->id;
+}
+
+sub integrate_comment {
+ my $self = shift;
+ my ($change, $changeset) = validate_pos( @_, { isa => 'Prophet::Change' }, {isa => 'Prophet::ChangeSet'} );
+
+ # Figure out the remote site's ticket ID for this change's record
+
+ my %props = map { $_->name => $_->new_value } $change->prop_changes;
+
+ my $ticket_id = $self->sync_source->remote_id_for_uuid( $props{'ticket'} );
+ my $ticket = RT::Client::REST::Ticket->new( rt => $self->sync_source->rt, id => $ticket_id);
+
+ my %content = ( message => $props{'content'},
+ );
+
+ if ( ($props{'type'} ||'') eq 'comment' ) {
+ $ticket->comment( %content);
+ } else {
+ $ticket->correspond(%content);
+ }
+ return $ticket_id;
+}
+
+sub integrate_attachment {
+ my ($self, $change, $changeset ) = validate_pos( @_, { isa => 'App::SD::Replica::RT::PushEncoder'}, { isa => 'Prophet::Change' }, { isa => 'Prophet::ChangeSet' });
+
+
+ my %props = map { $_->name => $_->new_value } $change->prop_changes;
+ my $ticket_id = $self->sync_source->remote_id_for_uuid( $props{'ticket'});
+ my $ticket = RT::Client::REST::Ticket->new( rt => $self->sync_source->rt, id => $ticket_id );
+
+ my $tempdir = File::Temp::tempdir( CLEANUP => 1 );
+ my $file = file( $tempdir => ( $props{'name'} || 'unnamed' ) );
+ my $fh = $file->openw;
+ print $fh $props{content};
+ close $fh;
+ my %content = ( message => '(See attachments)', attachments => ["$file"]);
+ $ticket->correspond(%content);
+ return $ticket_id;
+}
+
+
+sub _recode_props_for_integrate {
+ my $self = shift;
+ my ($change) = validate_pos( @_, { isa => 'Prophet::Change' } );
+
+ my %props = map { $_->name => $_->new_value } $change->prop_changes;
+ my %attr;
+
+ for my $key ( keys %props ) {
+ next unless ( $key =~ /^(summary|queue|status|owner|custom)/ );
+ if ( $key =~ /^custom-(.*)/ ) {
+ $attr{cf}->{$1} = $props{$key};
+ } elsif ( $key eq 'summary' ) {
+ $attr{'subject'} = $props{summary};
+ } else {
+ $attr{$key} = $props{$key};
+ }
+ if ( $key eq 'status' ) {
+ $attr{$key} =~ s/^closed$/resolved/;
+ }
+ }
+ return \%attr;
+}
+1;
More information about the Bps-public-commit
mailing list