Logo
Distributed Genetic Programming Framework
print print

File org.dgpf.search.api.p2p.SearchP2P.java

Here you can find all the information about the file org.dgpf.search.api.p2p.SearchP2P.java. You may explore it here or download it onto your local disk.
/*
 * Copyright (c) 2006 Thomas Weise
 *
 * E-Mail           : tweise@gmx.de
 * Creation Date    : 2006-04-10 11:36:37
 * Original Filename: org.dgpf.search.api.p2p.SearchP2P.java
 * Version          : 2.1.0
 * Last modification: 2006-05-12
 *                by: Thomas Weise
 *
 * License          : GNU LESSER GENERAL PUBLIC LICENSE
 *                    Version 2.1, February 1999
 *                    You should have received a copy of this license along
 *                    with this library; if not, write to the Free Software
 *                    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
 *                    MA 02111-1307, USA or download the license under
 *                    http://www.gnu.org/copyleft/lesser.html.
 *
 * Warranty         : This software is provided "as is" without any
 *                    warranty; without even the implied warranty of
 *                    merchantability or fitness for a particular purpose.
 *                    See the Gnu Lesser General Public License for more
 *                    details.
 */


package org.dgpf.search.api.p2p;

import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.Socket;
import java.util.Arrays;

import org.dgpf.search.api.Individual;
import org.dgpf.search.api.IndividualComparator;
import org.dgpf.search.api.SearchEngine;
import org.dgpf.search.api.SearchState;
import org.sfc.events.EventPropagator;
import org.sfc.net.p2p.PeerToPeer;
import org.sfc.parallel.SfcThreadGroup;
import org.sfc.utils.Typesafe;

/**
 * This class is the peer-to-peer node used by the genetic engine.
 * @param <Genotype>  The genotype of the individuals evolved. It must be
 *                    serializable.
 *
 * @author Thomas Weise
 */

public class SearchP2P<Genotype extends Serializable>
                                extends PeerToPeer
  {

/**
 * The buffer for the incomming individuals.
 */

  private final     CircularIndividualBuffer<Genotype> m_incomming ;
/**
 * The buffer with the individuals to be sent.
 */

          final     CircularIndividualBuffer<Genotype> m_outgoing  ;
/**
 * The genetic emigrator thread.
 */

  private final     IndividualEmigrator                m_emigrator ;
/**
 * The owning genetic engine.
 */

          final     SearchEngine<Genotype>             m_owner   ;
/**
 * The count of currently emigrated individuals.
 */

  private           int                             m_emigrants  ;
/**
 * The count of rejected immigrants.
 */

  private           int                             m_rejected_immigrants ;
/**
 * The count of sub-fitness functions.
 */

  private final     int                             m_fitness_count ;
/**
 * The immigration fitness threshold.
 * @see #assign(SearchState, P2PData, IndividualComparator)
 */

  private final     double[]                        m_immigration_th  ;
/**
 * The internally used individual comparator.
 */

  private           IndividualComparator            m_comparator  ;
/**
 * If immigration by level is on.
 */

  private           boolean                         m_by_level  ;
/**
 * The current immigration level.
 */

                    long                            m_level ;
  
/**
 * Create a new peer-to-peer node.
 * @param p_owner   The owning genetic engine.
 * @param p_threads The thread group all internal threads of this
 *                  peer-to-peer node should be members of.
 * @param p_events  The event propagator to use.
 * @param p_port    The port where the server component of this peer node
 *                  listens.
 * @param p_outbuf  The initial output buffer size.
 * @param p_inbuf   The initial input buffer size.
 * @param p_fitness_count The count of fitness functions applied.
 * @param p_comparator  The individual comparator to be used.
 */

  public SearchP2P(final  SearchEngine<Genotype>  p_owner,
                   final  SfcThreadGroup          p_threads,
                   final  EventPropagator         p_events,
                   final  int                     p_port,
                   final  int                     p_outbuf,
                   final  int                     p_inbuf,
                          int                     p_fitness_count,
                   final  IndividualComparator    p_comparator)
    {
    super(p_threads, p_events, p_port);

    double[]  l_d;

    this.m_incomming     = new CircularIndividualBuffer<Genotype>(p_comparator,
                                                               p_inbuf);
    this.m_outgoing      = new CircularIndividualBuffer<Genotype>(p_comparator,
                                                               p_outbuf);
    this.m_emigrator     = new IndividualEmigrator<Genotype>(this, p_threads);
    this.m_owner         = p_owner;
    this.m_fitness_count = p_fitness_count;
    this.m_comparator    = p_comparator;
    this.m_by_level      = P2PSearchUtils.DEFAULT_IMMIGRATION_BY_LEVEL;

    this.m_immigration_th = l_d = new double[p_fitness_count];

    for(--p_fitness_count; p_fitness_count >= 0; p_fitness_count--)
      {
      l_d[p_fitness_count] = Double.POSITIVE_INFINITY;
      }
    }

/**
 * <p>
 * Set the immigration thresholds.
 * </p>
 * <p>
 * Thresholds for immigration: immigrants may only enter if they have
 * fitnesses that are weaker than all the values specified here. For each
 * fitness function, there can one threshold value be defined denying all
 * immigrants with fitnesses above this value immigration. To disable this
 * threshold, <code>Double.POSITIVE_INFINITY</code> can be specified.
 * </p><p>
 * You may use this property to protect the local evolution from external
 * influences.</p>
 *
 * @param p_s     The search state governing the search.
 * @param p_gs    The peer-to-peer data object to obtain the data from.
 * @param p_comp  The individual comparator to use.
 */

  public synchronized final void  assign
                      (final SearchState<Genotype>     p_s,
                       final P2PData                   p_gs,                       
                       final IndividualComparator      p_comp)
    {
    final double[]                            l_d;
          int                                 l_s, l_i, l_bs;
          double                              l_q;
          CircularIndividualBuffer<Genotype>  l_gcb;
          boolean                             l_b;
          Individual<Genotype>                l_x;

    if(p_comp != null)
      {
      this.m_outgoing.m_comparator  = p_comp;
      this.m_incomming.m_comparator = p_comp;
      this.m_comparator             = p_comp;
      }
    
    this.m_by_level = p_gs.get_immigration_by_level();
    this.m_level    = p_s.get_update_level();

    l_d = this.m_immigration_th;
    l_b = false;
    for(l_i = (l_d.length-1); l_i >= 0; l_i--)
      {
      l_q = p_gs.get_immigration_threshold(l_i);
      if(l_q >= 0.0d)
        {
        if(l_q < l_d[l_i]) l_b = true;
        l_d[l_i] = l_q;
        }
      }
    
    
        
    l_s   = p_gs.get_allowed_emigrants();
    if(p_gs.get_emigrate_best()) l_s++;    
    this.m_outgoing.set_size(l_s);
//    l_gcb = this.m_outgoing;
//    l_bs  = l_gcb.get_size();
//    if(l_s != l_bs)
//      {
//      if( (l_bs > (3*l_s)) || (l_bs < l_s) )
//        {
//        l_gcb.set_size(l_s);
//        }
//      }
    
      l_gcb = this.m_incomming;
      l_gcb.set_size(p_gs.get_allowed_immigrants());
          
//    l_s   = p_gs.get_allowed_immigrants();
//    l_bs  = l_gcb.get_size();
//    if(l_s != l_bs)
//      {
//      if( (l_bs > (3*l_s)) || (l_bs < l_s) )
//        {
//        l_gcb.set_size(l_s);
//        }
//      }


// check the individuals since the threshold has decreased
    if(l_b)
      {
      for(l_bs = l_gcb.get_count(); l_bs > 0; l_bs--)
        {
        l_x = l_gcb.nonblocking_dequeue();
        if(check(l_x, l_d))
          {
          this.m_rejected_immigrants++;
          continue;
          }
        l_gcb.enqueue(l_x);
        }      
      }
    }


/**
 * Enque an individual for emmigration.
 * @param p_individual  The individual for emmigration.
 */

  public  final void  emmigrate (Individual<Genotype> p_individual)
    {
    p_individual = Typesafe.cast(p_individual.clone());
    this.m_outgoing.enqueue(p_individual);
    }


/**
 * This method will be called for all incomming sockets. It should do
 * whatever needs to be done.
 * @param p_socket  The incomming socket.
 * @throws  Throwable Whenever something goes wrong.
 */

  @Override
  protected final void  handle_socket (final Socket p_socket)
                                                      throws Throwable
    {
          int                     l_i;
    final int                     l_fc;
          ObjectInputStream       l_ois;
          Individual<Genotype>    l_id;
    final double[]                l_th;
    final IndividualComparator    l_ic;
          boolean                 l_b;
    final OutputStream            l_oos;
    final SearchEngine<Genotype>  l_owner;
   
    l_i   = this.m_incomming.get_free_slots();
    l_oos = p_socket.getOutputStream();
    try
      {
      l_oos.write(l_i & 0xff);
      l_oos.write((l_i >>> 8) & 0xff);
      l_oos.write((l_i >>> 16) & 0xff);
      l_oos.write((l_i >>> 24) & 0xff);
      l_oos.flush();
          
      if(l_i > 0)
        {    
        l_ois = new ObjectInputStream(p_socket.getInputStream());
        try
          {
          l_i     = l_ois.readInt();
          l_fc    = this.m_fitness_count;
          l_th    = this.m_immigration_th;
          l_ic    = this.m_comparator;
          l_owner = this.m_owner;
          
          synchronized(this)
            {
            l_b   = (!(this.m_by_level &&
                    ((l_ois.readLong() <= ((11*this.m_level)/10)))));
            }
      
main:
          for(; l_i > 0; l_i--) //here we could include is_running()
            {
            l_id = Typesafe.cast(Individual.receive(l_ois, l_fc));
      
            synchronized(this)
              {
              if(l_ic.is_useless(l_id))
                {
                this.m_rejected_immigrants++;
                continue main;
                }
              
              if(l_owner.check_best(l_id))
                {
                l_b = false;
                Arrays.fill(l_th, Double.POSITIVE_INFINITY);
                }
              else if(l_b && check(l_id, l_th))
                {
                this.m_rejected_immigrants++;
                continue main;
                }
              }
      
            this.m_incomming.enqueue(l_id);
            }
          }
        finally
          {    
          l_ois.close();
          }
        }
      }
    finally
      {
      l_oos.close();
      }
    }

/**
 * Check an individual's fitness values.
 * @param p_id  The individual of concern.
 * @param p_th  The fitness threshold array.
 * @return  <code>false</code> if and only if the individual is ok.
 */

  private final boolean check (final Individual p_id,
                               final double[]   p_th)
    {
    int l_i;
    
    for(l_i = (p_th.length-1); l_i >= 0; l_i--)
      {
      if(p_th[l_i] < p_id.get_fitness(l_i)) return true;
      }
    
    return false;
    }
  
/**
 * Obtain an individual received from other genetic peer to peer nodes.
 * <code>null</code> might be returned if currently no individual has been
 * received.
 * @return  An individual received from other genetic peers, or
 *          <code>null</code> if currently no individual is enqueued.
 */

  public  final Individual<Genotype>  immigrate ()
    {
    return this.m_incomming.nonblocking_dequeue();
    }

/**
 * This method must be called on each update to update the statistic data
 * of the search state. Make sure you call this method
 * <strong>after</code> updating the immigration statistics of the state!
 * @param p_bag The search state bag containing the state to be updated.
 */

  public synchronized final void  update_state  (final P2PStateBag p_bag)
    {
    p_bag.m_state.update_exchange(this.m_rejected_immigrants,
                                  this.m_emigrants);
    this.m_rejected_immigrants = 0;
    this.m_emigrants           = 0;
    }
  

/**
 * Increase the internal emigrant counter.
 * @param p_count The count of freshly emigrated individuals.
 */

  final synchronized  void  add_emigrants (final int p_count)
    {
    this.m_emigrants += p_count;
    }


/**
 * Startup the search. This method will be called by <code>start()</code>.
 * @see #start()
 */

  @Override
  protected void  do_start  ()
    {
    super.do_start();
    this.m_emigrator.start();
    }
  
/**
 * Perform a shutdown. It is guaranteed to be called only once for shutdown.
 * This method should only perform the actions needed to shutdown the
 * network of activities. The closing of the activity itself will be done
 * by <code>do_abort</code>, which will subsequently be called
 * automatically.
 * @see #abort()
 * @see #shutdown()
 */

  @Override
  protected void  do_shutdown ()
    {
    super.do_shutdown();
    this.m_owner.shutdown();    
    }
  
/**
 * This method aborts the activity. It is guaranteed to be called only once
 * for aborting and once for shutdown.
 * @see #abort()
 * @see #shutdown()
 */

  @Override
  protected void  do_abort  ()
    {
    super.do_abort();
    this.m_owner.abort();
    this.m_emigrator.abort();
    this.m_incomming.abort();
    this.m_outgoing.abort();
    }

/**
 * Wait until this peer node has aborted.
 */

  @Override
  public  final void  wait_for  ()
    {
    super.wait_for();
    this.m_emigrator.wait_for();
    }


/**
 * Obtain a string representation of this peer-to-peer node.
 * @return A string representation of this peer-to-peer node.
 */

  @Override
  public  final String  toString()
    {
    return "Search " + super.toString();
    }
  }

File Information:

file name:SearchP2P.java
package:org.dgpf.search.api.p2p
qualified name:org.dgpf.search.api.p2p.SearchP2P.java
file type:Java Source File
download location:download http://dgpf.sourceforge.net/source/org/dgpf/search/api/p2p/SearchP2P.java
size:14.013 KB (14350 B)
uploaded: 2015-07-22 04:11:00 GMT+0000
last update: 2006-08-27 11:18:02 GMT+0000
last access: 2017-11-17 15:00:46 GMT+0000

statistics online since 2006-01-02.   RSS Feed
Contact us by sending an email to tweise@gmx.de to receive further information, to report errors, or to join our project.
All content on this site (http://dgpf.sourceforge.net/) is LGPL-licensed.
http://dgpf.sourceforge.net/scripts/source/source.php last modified at 2015-07-22 04:10:53 GMT+0000 served at 2017-11-17 21:18:28 GMT+0000.
Valid CSS Valid XHTML 1.1
Valid RSS SourceForge.net Logo