package Sprocket::Util::Memcached; use Sprocket qw( Util::Observable ); use Class::Accessor::Fast; use base qw( Sprocket::Util::Observable Class::Accessor::Fast ); use POE qw( Filter::Memcached ); use strict; use warnings; sub new { my $class = shift; my $self = bless( { @_ }, ref $class || $class ); $self->register_hook( [ map { 'sprocket.memcached.'.$_ } qw( connected receive ) ] ); return $self; } # external object methods sub get { my ( $self, $data ) = @_; $self->_queue( get => $data ); $self->{con}->send({ cmd => 'get', ( map { exists( $data->{$_} ) ? ( $_ => $data->{$_} ) : () } qw( key keys ) ) }); } sub set { my ( $self, $data, $cmd ) = @_; $cmd ||= 'set'; $self->_queue( $cmd => $data ); $self->{con}->send({ cmd => $cmd, key => $data->{key}, obj => $data->{obj}, ( defined( $data->{flags} ) ? ( flags => $data->{flags} ) : () ), ( defined( $data->{exp} ) ? ( exp => $data->{exp} ) : () ) }); } sub add { my $self = shift; $self->set( $_[0], 'add' ); } sub replace { my $self = shift; $self->set( $_[0], 'replace' ); } sub delete { my ( $self, $data ) = @_; $self->_queue( delete => $data ); $self->{con}->send({ cmd => 'delete', key => $data->{key}, ( defined( $data->{time} ) ? ( time => $data->{time} ) : () ), }); } sub incr { my ( $self, $data ) = @_; $self->_queue( incr => $data ); $self->{con}->send({ cmd => 'incr', key => $data->{key}, ( defined( $data->{by} ) ? ( by => $data->{by} ) : () ), }); } sub decr { my ( $self, $data ) = @_; $self->_queue( decr => $data ); $self->{con}->send({ cmd => 'decr', key => $data->{key}, ( defined( $data->{by} ) ? ( by => $data->{by} ) : () ), }); } sub version { my ( $self, $data ) = @_; $self->_queue( version => $data ); $self->{con}->send({ cmd => 'version', }); } sub stats { my ( $self, $data ) = @_; $self->_queue( stats => $data ); $self->{con}->send({ cmd => 'stats', }); } sub flush_all { my ( $self, $data ) = @_; $self->_queue( flush_all => $data ); $self->{con}->send({ cmd => 'flush_all', }); } # XXX don't implement quit unless needed 1; package Sprocket::Plugin::Memcached; use Sprocket::Plugin; use base 'Sprocket::Plugin'; use POE qw( Filter::Memcached ); use Scalar::Util qw( reftype ); use strict; use warnings; sub new { my $class = shift; $class->SUPER::new( name => 'Memcached', queue => [], @_ ); } # --------------------------------------------------------- # Client sub remote_connected { my $self = shift; my ( $client, $con, $socket ) = @_; $self->take_connection( $con ); # POE::Filter::Stackable object: $con->filter->push( POE::Filter::Memcached->new() ); $con->filter->shift(); # POE::Filter::Stream $memcached->broadcast( 'sprocket.memcached.connected', { source => $self, connection => $con, } ); return; } sub remote_receive { my $self = shift; my ( $client, $con, $data ) = @_; # $self->_log(v => 4, msg => 'data:'.Data::Dumper->Dump([$data])); # pull off the top of the queue my ( $cmd, $item ) = $self->_queue; $item->{cmd} = $cmd; # $data->{cmd} ||= 'unknown'; # $self->_log( v => 4, msg => "cmd in: $data->{cmd} queue: $cmd" ); $memcached->broadcast( 'sprocket.memcached.receive', { source => $self, connection => $con, item => $item, data => $data, } ); my $callback = delete $item->{callback}; $callback->( $item, $data ) if ( reftype( $callback ) eq 'CODE' ); return 1; } # --------------------------------------------------------- # put or pull from the queue # DO NOT call this yourself sub _queue { my $self = shift if ( @_ ) { push( @{$self->{queue}}, [ @_ ] ); } else { my $out = shift @{$self->{queue}}; return $out ? @{ $out } : (); } }