package Sprocket::Spread; use strict; use warnings; use Sprocket qw( Util::ChannelManager ); use Sprocket::Common qw( isa_HASH isa_ARRAY ); use POE qw( Driver::SysRW Wheel::ReadWrite ); use Spread; use Carp qw( croak ); use Symbol qw( gensym ); # XXX use Data::Dumper; our $sprocket_spread; sub import { my ( $class, $args ) = @_; my $package = caller(); croak "Sprocket::Spread expects its arguments in a hash ref" if ( $args && !isa_HASH( $args ) ); unless ( delete $args->{no_auto_export} ) { { no strict 'refs'; *{ $package . '::sprocket_spread' } = \$sprocket_spread; # XXX push Spread consts into their namespace? } } return if ( delete $args->{no_auto_bootstrap} ); # bootstrap __PACKAGE__->new( %$args ); return; } sub new { my $class = shift; return $sprocket_spread if ( $sprocket_spread ); my %args = &adjust_params; my $host = delete $args{host}; my $port = delete $args{port}; warn 'Unknown params passed to Sprocket::Spread: '.join(',', keys %args)."\n" if ( keys %args ); $sprocket_spread = bless( { host => $host, port => $port, cm => Sprocket::Util::ChannelManager->new(), }, ref $class || $class); $sprocket_spread->{session_id} = POE::Session->create( object_states => [ $sprocket_spread => [qw( _start _stop input error disconnect connect )] ] )->ID(); return $sprocket_spread; } sub connect { my ( $self, %args ); if ( ref $_[ KERNEL ] ) { ( $self, %args ) = @_[ OBJECT, ARG0 .. $#_ ]; } else { $self = shift; return $poe_kernel->call( $self->{session_id} => 'connect' => @_ ); } return 1 if ( $self->{connected} ); $args{host} = $self->{host} if ( $self->{host} && !$args{host} ); $args{port} = $self->{port} if ( $self->{port} && !$args{port} ); $self->{spread_name} = ( $args{port} || '4803' ) . '@' . ( $args{host} || 'localhost' ); $self->{private_name} = 'sp-' . $$; # names can't be too long, because it chops it and finds that the first part is not unique # ARGGG! #$self->{private_name} = $poe_kernel->ID(); @{$self}{qw( mbox private_group )} = Spread::connect( { spread_name => $self->{spread_name}, private_name => $self->{private_name}, } ); if ( $@ || !defined( $self->{mbox} ) || !defined( $self->{private_group} ) ) { warn "Spread connect failed: $@\n"; $self->{connected} = 0; return 0; } else { # XXX retry? my $fh = $self->{fh} = gensym(); open( $fh, "<&=$self->{mbox}" ) or die $!; $self->{wheel} = POE::Wheel::ReadWrite->new( Handle => $fh, Driver => Sprocket::Spread::Driver->new( mbox => $self->{mbox} ), Filter => Sprocket::Spread::Filter->new(), InputEvent => 'input', ErrorEvent => 'error', ); warn "spread connected with mbox: $self->{mbox} priv: $self->{private_name} and spread_name $self->{spread_name}"; $self->{connected} = 1; return 1; } } sub _start { warn "Spread started\n"; $_[KERNEL]->yield( 'connect' ); } sub _stop { warn "Spread stopped\n"; } sub error { warn "Spread error\n"; $_[OBJECT]->disconnect(); } sub input { my ( $self, $input ) = @_[ OBJECT, ARG0 ]; my ( $type, $sender, $groups, $mess_type, $endian, $message ) = @{$input}; return $self->disconnect() unless( defined( $type ) ); if ( $type & REGULAR_MESS ) { if ( defined( $endian ) && $endian ) { warn "Spread: endian mismatch!"; } $self->deliver( 'message', $self->{private_name}, { type => 'message', message => $message, group => $sender, members => $groups, index => $mess_type, } ); return; } if ( $type & TRANSITION_MESS ) { $self->deliver( 'admin', $self->{private_name}, { 'type' => 'transitional', 'group' => $sender } ); } elsif ( $type & CAUSED_BY_LEAVE && !( $type & REG_MEMB_MESS ) ) { $self->deliver( 'admin', $self->{private_name}, { 'type' => 'self_leave', 'group' => $sender } ); } elsif ( $type & REG_MEMB_MESS ) { my ( @gids, $nummem, $member ); eval { @gids = unpack( 'IIIIa*', $message ); ( $nummem, $member ) = delete @gids[ 3, 4 ]; }; if ( $@ ) { $self->deliver( 'error', 'receive', $@ ); return; } if ( $type & CAUSED_BY_DISCONNECT ) { $self->deliver( 'admin', $self->{private_name}, { type => 'disconnect', who => $member, group => $sender, members => $groups, index => $mess_type, gid => \@gids, } ); } elsif ( $type & CAUSED_BY_NETWORK ) { $self->deliver( 'admin', $self->{private_name}, { type => 'network', message => $message, group => $sender, members => $groups, index => $mess_type, gid => \@gids, } ); } elsif ( $type & CAUSED_BY_JOIN ) { $self->deliver( 'admin', $self->{private_name}, { type => 'join', who => $member, group => $sender, members => $groups, index => $mess_type, gid => \@gids, } ); } elsif ( $type & CAUSED_BY_LEAVE ) { $self->deliver( 'admin', $self->{private_name}, { type => 'leave', who => $member, group => $sender, members => $groups, index => $mess_type, gid => \@gids, } ); } else { $self->deliver( 'error', 'receive', 'unknown packet type' ); } } else { $self->deliver( 'error', 'receive', 'unknown packet type' ); } return; } sub deliver { my ( $self, $type ) = ( shift, shift ); my ( $errtype, $privname, $msg ); if ( $type eq 'error' ) { ( $errtype, $msg ) = @_; warn "error: $errtype $msg\n"; } else { ( $privname, $msg ) = @_; $self->{cm}->deliver( $type, $privname, $msg ); } # XXX print STDERR 'msg:'.$privname.' '.Data::Dumper->Dump([$msg]); return; } sub disconnect { my ( $self ); if ( ref $_[ KERNEL ] ) { $self = $_[ OBJECT ]; } else { $self = shift; return $poe_kernel->call( $self->{session_id} => 'disconnect' => @_ ); } # TODO warn "Spread disconnect\n"; $self->{connected} = 0; } sub publish { my ( $self, $groups, $message, $mess_type, $flag ) = @_; unless ( $self->{mbox} ) { warn "not connected when trying to publish to spread"; return; } $flag = SAFE_MESS unless( defined( $flag ) ); $mess_type = 0 unless( defined( $mess_type ) ); $groups = $groups->[0] if ( isa_ARRAY( $groups ) && $#{$groups} == 0 ); require Data::Dumper; warn "groups:".Data::Dumper->Dump([$groups]); my $ret; eval { $ret = Spread::multicast( $self->{mbox}, $flag, $groups, $mess_type, $message ); }; if ( $@ || !defined( $ret ) || $ret < 0 ) { $self->disconnect() if ( defined $sperrno && $sperrno == CONNECTION_CLOSED ); return 0; } return 1; } sub subscribe { my ( $self, $groups ) = @_; unless ( $self->{connected} ) { unless ( $self->connect() ) { return 0; } } $groups = $groups->[0] if ( isa_ARRAY( $groups ) && $#{$groups} == 0 ); my $ret; eval { $ret = Spread::join( $self->{mbox}, $groups ); }; if ( $@ && !$ret ) { $self->disconnect() if ( defined $sperrno && $sperrno == CONNECTION_CLOSED ); return 0; } return 1; } sub unsubscribe { my ( $self, $groups ) = @_; unless ( $self->{connected} ) { unless ( $self->connect() ) { return 0; } } $groups = $groups->[0] if ( isa_ARRAY( $groups ) && $#{$groups} == 0 ); my $ret; eval { $ret = Spread::leave( $self->{mbox}, $groups ); }; if ( $@ && !$ret ) { $self->disconnect() if ( defined $sperrno && $sperrno == CONNECTION_CLOSED ); return 0; } return 1; } sub plugin_subscribe { my ( $self, $plugin, $groups ) = @_; return $self->subscribe( $self->{cm}->subscribe( $plugin, $groups ) ); } sub plugin_unsubscribe { my ( $self, $plugin, $groups ) = @_; return $self->unsubscribe( $self->{cm}->unsubscribe( $plugin, $groups ) ); } sub plugin_publish { my ( $self, $plugin, $groups ) = ( shift, shift, shift ); my $pub = $self->{cm}->grouplist( $plugin, $groups ); require Data::Dumper; warn "plugin publish: $pub ".Data::Dumper->Dump([$pub]); return $self->publish( $pub, @_ ); } 1; package Sprocket::Spread::Driver; use strict; use warnings; sub new { my $class = shift; my %args = @_; my $mbox = delete $args{mbox}; warn "bad Spread param: ".join( ',', keys %args ) if ( keys %args ); bless( [ $mbox ], ref $class || $class ); } sub get { my ( $self, $fh ) = @_; my ( $type, $sender, $groups, $messt, $endian, $message ) = Spread::receive( $self->[ 0 ] ); if ( !defined( $type ) ) { warn "Spread: Unknown error"; return []; } return [ $type, $sender, $groups, $messt, $endian, $message ]; } 1; package Sprocket::Spread::Filter; use strict; use warnings; sub new { my $class = shift; bless( [], ref $class || $class ); } sub get { shift; return [ @_ ]; } 1;