Logo
Distributed Genetic Programming Framework
print print

File org.dgpf.search.api.cs.ClientThread.java

Here you can find all the information about the file org.dgpf.search.api.cs.ClientThread.java. You may explore it here or download it onto your local disk.
/*
 * Copyright (c) 2006 Thomas Weise
 * 
 * E-Mail           : tweise@gmx.de
 * Creation Date    : 2006-04-11  16:28:38
 * Original Filename: org.dgpf.search.api.cs.ClientThread.java
 * Version          : 2.1.1
 * Last modification: 2006-06-08
 *                by: Thomas Weise
 * 
 * License          : GNU LESSER GENERAL PUBLIC LICENSE
 *                    Version 2.1, February 1999
 *                    You should have received a copy of this license along
 *                    with this library; if not, write to the Free Software
 *                    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
 *                    MA 02111-1307, USA or download the license under
 *                    http://www.gnu.org/copyleft/lesser.html.
 *                    
 * Warranty         : This software is provided "as is" without any
 *                    warranty; without even the implied warranty of
 *                    merchantability or fitness for a particular purpose.
 *                    See the Gnu Lesser General Public License for more
 *                    details.
 */

 
package org.dgpf.search.api.cs;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;

import org.dgpf.search.api.SearchTask;
import org.dgpf.search.api.SearchTaskThread;
import org.sfc.events.ErrorEvent;
import org.sfc.events.IEventListener;
import org.sfc.net.Reconnection;

/**
 * A client thread belongs to each server known and is responsible for the
 * data exchange with that server.
 * 
 * @param <Genotype>    The sort of genotype used to represent individuals.
 *                      This must be a serializable type.
 *
 * @author Thomas Weise
 */

final class ClientThread<Genotype extends Serializable>
                                  extends SearchTaskThread<Genotype>
  {
/**
 * The owning search client.
 */

  private final     SearchClient<Genotype>  m_owner ;
/**
 * The socket address to connect to.
 */

          final     InetSocketAddress       m_addr  ;
/**
 * The socket representing the connection.
 */

  private           Socket                  m_socket  ;
/**
 * The socket's lifetime.
 */

  private           int                     m_lifetime  ; 
/**
 * The object output stream to be used.
 */

  private           ObjectOutputStream      m_oos; 
/**
 * The object input stream to be used.
 */

  private           ObjectInputStream       m_ois;
/**
 * The client reconnection facility.
 */

  private final     ClientReconnection      m_recon ;
/**
 * The time to wait.
 */

  private volatile  long                    m_time_to_wait  ;
  
/**
 * Create a new client thread.
 * @param p_owner The owning search client.
 * @param p_addr  The socket address to connect to.
 */

  ClientThread (final SearchClient<Genotype>  p_owner,
                final InetSocketAddress       p_addr)
    {
    super(p_owner.do_get_search_engine(),
          p_owner.m_queue);
    this.m_owner = p_owner;
    this.m_addr  = p_addr;
    this.m_recon = new ClientReconnection();
    }
  

/**
 * This method will be called whenever a new search task becomes available.
 * @param p_task  The search task that just became available.
 */

  @Override
  protected final void  process (final SearchTask<Genotype> p_task)
    {    
    synchronized(this)
      {
      try
        {
        this.ensure_connection(CSSearchUtils.CMD_TASK);
        this.m_recon.do_reset();
        this.m_oos.writeUnshared(p_task);
        this.m_oos.flush();
        SearchTask.receive_result(this.m_ois, p_task);
        }
      catch(Throwable l_t)
        {
        this.reject(p_task);
        this.m_recon.do_call_error(l_t);
        }
      }
    
    if(this.m_time_to_wait != 0)
      {
      this.safe_sleep(this.m_time_to_wait);
      this.m_time_to_wait = 0;
      }
    }
  
/**
 * Transmit the search settings needed to configure the servers.
 * @param p_settings  The settings to be transmitted.
 */

  public  synchronized  final
                  void transmit_settings(final Serializable p_settings)
    {
    try
      {
      this.ensure_connection(CSSearchUtils.CMD_SETTINGS);
      this.m_recon.do_reset();
      this.m_oos.writeUnshared(p_settings);
      }
    catch(Throwable l_t)
      {
      this.m_recon.do_call_error(l_t);
      }
    }
  
/**
 * Shutdown this client plus the server it belongs to. 
 */

  public  final void  shutdown()
    {
    try
      {
      synchronized(this)
        {
        this.close_connection();//TODO: validate ith this is needed
        
        try
          {
          this.ensure_connection(CSSearchUtils.CMD_SHUTDOWN);
          this.m_socket.getOutputStream().flush();
          }
        catch(Throwable l_t)
          {
          //
          }
        finally
          {        
          this.close_connection();
          }      
        }
      }
    finally
      {    
      this.abort();
      }
    }
  
/**
 * This method ensures that the correct connection is available.
 * @param p_command The command to be executed.
 * @throws  Throwable If something goes wrong.
 */

  private final void ensure_connection (final byte p_command)
    throws Throwable
    {
    Socket        l_s;
    OutputStream  l_os;
    
    if( (--this.m_lifetime) <= 0 ) this.close_connection();
    
    l_s = this.m_socket;
    if(l_s == null)
      {
      l_s = new Socket();
      l_s.connect(this.m_addr);
      this.m_socket = l_s;
      this.m_lifetime = CSSearchUtils.DEFAULT_LIFETIME;
      }
    
    if((l_os = this.m_oos) == null) l_os = l_s.getOutputStream();
    l_os.write(p_command);
        
    if(p_command <= CSSearchUtils.CMD_QUERY_THREADS) return ;
    
    if(this.m_oos == null)
      {
      this.m_oos = new ObjectOutputStream(l_os);
      this.m_oos.flush();
      }
    
    //if(p_command <= CSSearchUtils.CMD_SETTINGS) return ;
    
    if(this.m_ois == null)
      {      
      l_os.flush();
      this.m_ois = new ObjectInputStream(l_s.getInputStream());
      }
    }
  
/**
 * Close the connection.
 */

  private final void  close_connection()
    {
    if(this.m_ois != null)
      {
      try
        {
        this.m_ois.close();
        }
      catch(Throwable l_t)
        {
        //
        }
      finally
        {
        this.m_ois = null;
        }
      }
    
    if(this.m_oos != null)
      {
      try
        {
        this.m_oos.close();
        }
      catch(Throwable l_t)
        {
        //
        }
      finally
        {
        this.m_oos = null;
        }
      }
    
    if(this.m_socket != null)
      {
      try
        {
        this.m_socket.close();
        }
      catch(Throwable l_t)
        {
        //
        }
      finally
        {
        this.m_socket = null;
        }
      }
    }
  
/**
 * This method is called when the connection failed.
 * @param p_error         The error raised because of the connection
 *                        failure.
 * @param p_should_retry  <code>true</code> if and only if we should try to
 *                        reconnect, <code>false</code> if we have already
 *                        tried for a long time and now should abort this
 *                        useless attempt.
 * @param p_time_to_wait  The time to wait until the next connection retry
 *                        should be performed.
 */

  protected final void  on_error  (final Throwable  p_error,
                                   final boolean    p_should_retry,
                                   final long       p_time_to_wait)
    {
    IEventListener l_ep;
    
    this.m_time_to_wait = p_time_to_wait;
    
    if(p_error != null)
      {
      l_ep = this.get_listener();    
      if(l_ep != null)
        {
        l_ep.receive(new ErrorEvent(this.m_addr, p_error));
        }
      }
    
    this.close_connection();
    
    if(!p_should_retry)
      {
      this.abort();
      synchronized(this.m_owner)
        {
        this.m_owner.m_clients.remove(this);
        }
      }
    }
  
/**
 * Override this method to perform something when the thread dies.
 */

  @Override
  protected final void  after_termination ()
    {
    this.close_connection();
    super.after_termination();
    }
  
/**
 * The client version of the reconnection class.
 *
 * @author Thomas Weise
 */

  private final class ClientReconnection  extends Reconnection<Object>
    {
/**
 * Reset the client reconnection.
 */

    final void  do_reset  ()
      {
      this.reset();
      }
    
/**
 * This method must be implemented by sub-classes. It should establish the
 * connection and may fail with an arbitrary error.
 * @return  The connection-representing object if successful.
 */

    @Override
    protected final  Object  do_connect ()
      {
      return null;
      }
  
/**
 * This method is called when the connection failed.
 * @param p_error         The error raised because of the connection
 *                        failure.
 * @param p_should_retry  <code>true</code> if and only if we should try to
 *                        reconnect, <code>false</code> if we have already
 *                        tried for a long time and now should abort this
 *                        useless attempt.
 * @param p_time_to_wait  The time to wait until the next connection retry
 *                        should be performed.
 */

    @Override
    protected final void  on_error  (final Throwable  p_error,
                                     final boolean    p_should_retry,
                                     final long       p_time_to_wait)
      {
      ClientThread.this.on_error(p_error, p_should_retry, p_time_to_wait);
      }
    

/**
 * This method is used to call the <code>on_error</code> method with the
 * correct parameters.
 * @param p_t The error caught.
 */

    final void  do_call_error(final Throwable p_t)
      {
      this.call_error(p_t);
      }
    }
  

/**
 * The human readable name of the activity.
 * @return The human readable name of the activity.
 */

  @Override
  public  final  String  toString  ()
    {
    return "Client Thread (" + this.m_owner + ')';
    }
  }

File Information:

file name:ClientThread.java
package:org.dgpf.search.api.cs
qualified name:org.dgpf.search.api.cs.ClientThread.java
file type:Java Source File
download location:download http://dgpf.sourceforge.net/source/org/dgpf/search/api/cs/ClientThread.java
size:10.319 KB (10567 B)
uploaded: 2015-07-22 04:11:00 GMT+0000
last update: 2006-07-15 09:30:33 GMT+0000
last access: 2017-11-19 04:58:07 GMT+0000

statistics online since 2006-01-02.   RSS Feed
Contact us by sending an email to tweise@gmx.de to receive further information, to report errors, or to join our project.
All content on this site (http://dgpf.sourceforge.net/) is LGPL-licensed.
http://dgpf.sourceforge.net/scripts/source/source.php last modified at 2015-07-22 04:10:53 GMT+0000 served at 2017-11-19 04:58:07 GMT+0000.
Valid CSS Valid XHTML 1.1
Valid RSS SourceForge.net Logo