Logo
Distributed Genetic Programming Framework
print print

File org.sfc.net.p2p.PeerToPeer.java

Here you can find all the information about the file org.sfc.net.p2p.PeerToPeer.java. You may explore it here or download it onto your local disk.
/*
 * Copyright (c) 2006 Thomas Weise
 *
 * E-Mail           : tweise@gmx.de
 * Creation Date    : 2006-02-22 05:35:11
 * Original Filename: org.sfc.net.p2p.PeerToPeer.java
 * Version          : 3.1.0
 * Last modification: 2006-05-12
 *                by: Thomas Weise
 *
 * License          : GNU LESSER GENERAL PUBLIC LICENSE
 *                    Version 2.1, February 1999
 *                    You should have received a copy of this license along
 *                    with this library; if not, write to the Free Software
 *                    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
 *                    MA 02111-1307, USA or download the license under
 *                    http://www.gnu.org/copyleft/lesser.html.
 *
 * Warranty         : This software is provided "as is" without any
 *                    warranty; without even the implied warranty of
 *                    merchantability or fitness for a particular purpose.
 *                    See the Gnu Lesser General Public License for more
 *                    details.
 */


package org.sfc.net.p2p;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Random;

import org.sfc.collections.Arrays;
import org.sfc.events.ErrorEvent;
import org.sfc.events.EventPropagator;
import org.sfc.net.Net;
import org.sfc.net.cs.Server;
import org.sfc.parallel.SfcThreadGroup;

/**
 * <p>
 * This class represents one peer-to-peer node. It allows you connect to
 * other nodes in a peer-to-peer network.
 * </p>
 *
 * <p>
 *  The central point of the whole thing is the <code>PeerToPeer</code>-
 *  class. It represents one single peer-to-peer node running on a computer.
 *  This node is client and server at the same time. It has the following
 *  features:</p>
 * <ol>
 * <li>Create a sparsely connected network of a random amount of
 *     computers.</li>
 * <li>Any peer might open a connection to any other peer it knows at any
 * time.</li>
 * <li>If connection is currently not possible, maybe due to
 *     traffic/computational overload, the peer is internally marked as
 *     faulty and enqueued into a list. In longer getting cycles, it is
 *     tried to connect to the peer again. After a long time without
 *   responses, the peer is removed from the list. Therefore, any new peer
 *   contacting your node will be used instead.</li>
 *  <li>Connections are accepted and buffered. If two many pending
 *  connections are buffered, older ones will be closed automatically,
 * leading to our node being marked as faulte on the connecting node.</li>
 * </ol>
 *
 * @author Thomas Weise
 */

public abstract class PeerToPeer extends  Server  implements IPeerToPeer
  {

/**
 * The shutdown command enforces all peers to end their activities.
 */

  static  final     byte        CMD_SHUTDOWN  = 0;
/**
 * Notifys a peer that another peer is ready to accept connections.
 */

  static  final     byte        CMD_SUBSCRIBE = (CMD_SHUTDOWN + 1);
/**
 * This command is sent when a socket to a peer should be opened and
 * processed.
 */

  static  final     byte        CMD_SOCKET    = (CMD_SUBSCRIBE + 1);

/**
 * The external peer list.
 */

  private volatile  External[]              m_peers   ;
/**
 * The count of peers stored.
 */

  private volatile  int                     m_peer_count  ;
/**
 * The peer list synchronizing object.
 */

  private final     Object                  m_peer_sync ;
/**
 * The manager for the pending externals.
 */

          final     PendingExternalsThread  m_pending   ;
/**
 * The socket thread.
 */

          final     SocketThread            m_st  ;
/**
 * The count of external peers to automatically subscribe because some
 * others have failed for too long. If an external peer cannot be reached
 * anymore (all retry-timeouts have ellapsed uselessly), it is dropped from
 * the peer list. The repair counter then will be increased. Then all
 * incomming connection will be checked if they come from yet unknown peers,
 * and if so, they will automatically be subscribed.
 */

                    int                     m_repair    ;
/**
 * The internal socket buffer.
 */

          final     SocketBuffer             m_sb  ;
/**
 * The internal randomizer.
 */

  private final     Random                   m_random ;

/**
 * Create a new peer-to-peer node.
 * @param p_threads The thread group all internal threads of this
 *                  peer-to-peer node should be members of.
 * @param p_events  The event propagator to use.
 * @param p_port    The port where the server component of this peer node
 *                  listens. If this value is <= 0, the default port will
 *                  be used.
 */

  protected PeerToPeer(SfcThreadGroup   p_threads,
                       EventPropagator  p_events,
                       int              p_port)
    {
    super(p_threads, p_events, p_port);

    this.m_peers      = new External[12];
    this.m_peer_sync  = new Object();

    this.m_pending    = new PendingExternalsThread(this);

    this.m_sb         = new SocketBuffer(this);
    this.m_st         = new SocketThread(thisthis.m_sb);
    this.m_random     = new Random();
    }


/**
 * This method will be called for all incomming sockets. It should do
 * whatever needs to be done.
 * @param p_socket  The incomming socket.
 * @throws  Throwable Whenever something goes wrong.
 */

  protected abstract  void  handle_socket (final Socket p_socket)
                                                      throws Throwable;


/**
 * Override this method to provide some additional behavior.
 * @see #start()
 */

  @Override
  protected void  do_start()
    {
    super.do_start();
    this.m_pending.start();
    this.m_st.start();
    }
  
  
/**
 * Perform a shutdown. It is guaranteed to be called only once for shutdown.
 * This method should only perform the actions needed to shutdown the
 * network of activities. The closing of the activity itself will be done
 * by <code>do_abort</code>, which will subsequently be called
 * automatically.
 * @see #abort()
 * @see #shutdown()
 */

  @Override
  protected void  do_shutdown ()
    {
    External[]      l_e;
    int             l_i;
    
    super.do_shutdown();
    
    synchronized(this.m_peer_sync)
      {
      l_e = this.m_peers;
      for(l_i = (this.m_peer_count-1); l_i >= 0; l_i--)
        {
        l_e[l_i].shutdown();
        }
      }

    this.m_pending.shutdown();
    }
  
/**
 * This method aborts the activity. It is guaranteed to be called only once
 * for aborting and once for shutdown.
 * @see #abort()
 * @see #shutdown()
 */

  @Override
  protected void  do_abort  ()
    {    
    super.do_abort();
    
    this.m_sb.abort();                 
    this.m_pending.abort();    
    this.m_peer_count = 0;  
    this.m_sb.abort();
    }


/**
 * Wait until this peer node has aborted.
 */

  @Override
  public  void  wait_for  ()
    {
    super.wait_for();
    this.m_pending.wait_for();
    this.m_st.wait_for();
    }

/**
 * Obtain a connection to a random one of the available peers.
 * @return  A socket open to an arbitrary peer, or <code>null</code> if
 *          no such connection could be created.
 */

  public  final Socket  get_random_peer ()
    {
    Socket    l_s;
    External  l_ex;
    int       l_c;

    while(this.is_running())
      {
      synchronized(this.m_peer_sync)
        {
        l_c = this.m_peer_count;
        if(l_c > 0)
          {
          l_ex = this.m_peers[this.m_random.nextInt(l_c)];
          }
        else
          {
          return null;
          }
        }

      l_s = l_ex.connect();
      if(l_s != null) return l_s;
      }

    return null;
    }

/**
 * Add a peer to the internal peer list.
 * @param p_port  The port where the peer listens, if this parameter is
 *                invalid, we will try to use the same port we're currently
 *                running on.
 * @param p_i     The internet address the peer is located at.
 * @return  <code>true</code> if and only if the peer was not yet known and
 *          has been added successfully.
 */

  public  final boolean add_peer  (final InetAddress p_i,
                                   final int         p_port)
    {
    return this.add_peer(new InetSocketAddress(p_i,
                                 ((p_port > 0) && (p_port < 65536) ?
                                   p_port : this.get_port())));
    }



/**
 * Add a peer to the internal peer list.
 * @param p_isa   The internet address and port the peer is located at.
 * @return  <code>true</code> if and only if the peer was not yet known and
 *          has been added successfully.
 */

  public  final boolean add_peer  (final InetSocketAddress p_isa)
    {
    External[]        l_l;
    int               l_c, l_i;

    synchronized(this.m_peer_sync)
      {
      l_c = this.m_peer_count;
      l_l = this.m_peers;

      if(p_isa.getAddress().equals(Net.LOCAL_HOST))
        {
        if(p_isa.getPort() == this.get_port()) return false;
        }
      
      for(l_i = (l_c-1); l_i >= 0; l_i--)
        {
        if(l_l[l_i].m_address.equals(p_isa)) return false;
        }

      l_l               = Arrays.insert_space(l_l, l_c, l_c, 1);
      l_l[l_c]          = new External(p_isa, this);
      this.m_peers      = l_l;
      this.m_peer_count = (l_c+1);

      if(this.m_repair > 0) this.m_repair--;
      }

    return true;
    }


/**
 * Remove a peer from the internal peer list.
 * @param p_isa   The internet address and port the peer is located at.
 * @return  <code>true</code> if and only if the peer was known and
 *          has been removed successfully.
 */

  public  final boolean remove_peer  (final InetSocketAddress p_isa)
    {
    External[]        l_l;
    int               l_c, l_i;

    synchronized(this.m_peer_sync)
      {
      l_c = this.m_peer_count;
      l_l = this.m_peers;

      for(l_i = (l_c-1); l_i >= 0; l_i--)
        {
        if(l_l[l_i].m_address.equals(p_isa))
          {
          this.m_peers      = Arrays.delete(l_l, l_c, l_i, 1);
          this.m_peer_count = (l_c-1);
          return true;
          }
        }
      }

    if(this.m_pending.remove_peer(p_isa)) return true;

// We need to perform this twice to avoid a possible deadlock/slip
    synchronized(this.m_peer_sync)
      {
      l_c = this.m_peer_count;
      l_l = this.m_peers;

      for(l_i = (l_c-1); l_i >= 0; l_i--)
        {
        if(l_l[l_i].m_address.equals(p_isa))
          {
          this.m_peers      = Arrays.delete(l_l, l_c, l_i, 1);
          this.m_peer_count = (l_c-1);
          return true;
          }
        }
      }

    return false;
    }

/**
 * Add an external peer representation to the internal peer list.
 * @param p_external  The peer to be added.
 */

  final void  add_external  (final External p_external)
    {
    External[]  l_l;
    int         l_c;

    synchronized(this.m_peer_sync)
      {
      l_c               = this.m_peer_count;
      l_l               = Arrays.insert_space(this.m_peers, l_c, l_c, 1);
      l_l[l_c]          = p_external;
      this.m_peers      = l_l;
      this.m_peer_count = (l_c+1);
      }
    }


/**
 * Remove an external peer representation from the internal peer list.
 * @param p_external  The peer to be removed.
 * @return  <code>true</code> if and only if the peer was contained in the
 *          list.
 */

  final boolean  remove_external  (final External p_external)
    {
    External[]  l_l;
    int         l_c, l_i;

    synchronized(this.m_peer_sync)
      {
      l_c               = (this.m_peer_count-1);
      l_l               = this.m_peers;

      for(l_i = l_c; l_i >= 0; l_i--)
        {
        if(l_l[l_i] == p_external)
          {
          this.m_peers      = Arrays.delete_fast(l_l, l_c+1, l_i, 1);
          this.m_peer_count = l_c;
          return true;
          }
        }
      }

    return false;
    }


/**
 * Subscribe to a peer-to-peer node.
 * @param p_to  Subscribe to this peer-to-peer node.
 */

  public  final void  subscribe (final InetSocketAddress p_to)
    {
    Socket          l_s;
    OutputStream    l_os;
    int             l_p;
    EventPropagator l_ep;

    l_s = new Socket();
    try
      {
      l_s.connect(p_to);
      try
        {
        l_os = l_s.getOutputStream();
        l_os.write(CMD_SUBSCRIBE);
        l_p = this.get_port();
        l_os.write( (l_p >>> 8) & 0xff );
        l_os.write( (l_p >>> 0) & 0xff );
        l_os.flush();
        }
      finally
        {
        this.close_socket(l_s);
        }
      }
    catch(Throwable l_t)
      {
      if((l_ep = this.get_event_propagator()) != null)
        {
        l_ep.receive(new ErrorEvent(p_to, l_t));
        }
      }
    }

/**
 * Obtain a string representation of this peer-to-peer node.
 * @return A string representation of this peer-to-peer node.
 */

  @Override
  public  String  toString()
    {
    return "P2P Node " + Net.LOCAL_HOST + " at port " + this.get_port();
    }
  
/**
 * Obtain the event propagator suitable for this server component.
 * @return The event propagator suitable for this server component.
 */

  final EventPropagator do_get_events ()
    {
    return this.get_event_propagator();
    }
  
/**
 * Obtain the thread group used by this server component.
 * @return The thread group used by this server component.
 */

  final SfcThreadGroup  do_get_thread_group()
    {
    return this.get_thread_group();
    }
  
/**
 * Close a socket.
 * @param p_socket  The socket to be closed.
 */

  final void  do_close_socket  (final Socket p_socket)
    {
    this.close_socket(p_socket);
    }
  
/**
 * This method is called by the server thread whenever it receives an
 * incomming connection.
 * @param p_socket  The socket to process.
 * @return  <code>false</code> if the socket should immediately be closed,
 *          <code>true</code> if it should stay open.
 * @throws  Throwable When something goes wrong.
 */

  @Override
  protected boolean process_socket (final Socket p_socket) throws Throwable
    {
    InputStream l_is;
    int         l_p;
    
    l_is = p_socket.getInputStream();
    switch(l_is.read())
      {
      case  PeerToPeer.CMD_SHUTDOWN:
        {
        this.shutdown();
        return false;
        }

      case  PeerToPeer.CMD_SOCKET:
        {
        l_p = (l_is.read() << 8) | (l_is.read() << 0);
        if(this.m_repair > 0)
          {
          this.add_peer(p_socket.getInetAddress(), l_p);
          }
  
        this.m_sb.enqueue(p_socket);
  
        return true;
        }
  
      case  PeerToPeer.CMD_SUBSCRIBE:
        {
        this.add_peer(p_socket.getInetAddress(),
                (l_is.read() << 8) | (l_is.read() << 0));
        return false;
        }
        
      defaultreturn false;
      }
    }
  }

File Information:

file name:PeerToPeer.java
package:org.sfc.net.p2p
qualified name:org.sfc.net.p2p.PeerToPeer.java
file type:Java Source File
download location:download http://dgpf.sourceforge.net/source/org/sfc/net/p2p/PeerToPeer.java
size:15.044 KB (15406 B)
uploaded: 2015-07-22 04:11:12 GMT+0000
last update: 2006-08-25 14:53:22 GMT+0000
last access: 2018-01-23 15:39:39 GMT+0000

statistics online since 2006-01-02.   RSS Feed
Contact us by sending an email to tweise@gmx.de to receive further information, to report errors, or to join our project.
All content on this site (http://dgpf.sourceforge.net/) is LGPL-licensed.
http://dgpf.sourceforge.net/scripts/source/source.php last modified at 2015-07-22 04:10:53 GMT+0000 served at 2018-01-23 15:39:39 GMT+0000.
Valid CSS Valid XHTML 1.1
Valid RSS SourceForge.net Logo