FileDocCategorySizeDatePackage
ClusterFileTransfer.javaAPI DocJBoss 4.2.124402Fri Jul 13 20:52:38 BST 2007org.jboss.ha.framework.server

ClusterFileTransfer

public class ClusterFileTransfer extends Object implements org.jboss.ha.framework.interfaces.HAPartition.AsynchHAMembershipListener
Handles transfering files on the cluster. Files are sent in small chunks at a time (up to MAX_CHUNK_BUFFER_SIZE bytes per Cluster call).
author
Scott Marlow.
version
$Revision: 62037 $

Fields Summary
private static final int
MAX_CHUNK_BUFFER_SIZE
private Map
mPushsInProcess
private Map
mPullsInProcess
private org.jboss.ha.framework.interfaces.HAPartition
mPartition
private static final File
TEMP_DIRECTORY
private Map
mParentFolders
private static final String
SERVICE_NAME
private static final Logger
log
Constructors Summary
public ClusterFileTransfer(org.jboss.ha.framework.interfaces.HAPartition partition, Map destinationDirectoryMap)
Constructor needs the cluster partition and the mapping of server folder names to the java.io.File instance representing the physical folder.

param
partition represents the cluster.
param
destinationDirectoryMap is the mapping between server folder name and physical folder representation.


                                             
       
   
      this.mPartition = partition;
      this.mPartition.registerRPCHandler(SERVICE_NAME, this);
      this.mPartition.registerMembershipListener(this);
      mParentFolders = destinationDirectoryMap;
   
Methods Summary
private java.lang.StringCompositeKey(java.lang.String originNodeName, java.lang.String fileName)

      return originNodeName + "#" + fileName;
   
private java.io.FilegetParentFile(java.lang.String parentName)

      return (File) mParentFolders.get(parentName);
   
private static java.io.FilegetServerTempDir()

      return TEMP_DIRECTORY;
   
public static booleanlocalMove(java.io.File source, java.io.File destination)

   

             
      if(source.renameTo(destination))  // if we can simply rename the file
         return true;                   // return success
      // otherwise, copy source to destination
      OutputStream out = new FileOutputStream(destination);
      InputStream in = new FileInputStream(source);
      byte buffer[] = new byte[32*1024];
      int bytesRead = 0;
      while(bytesRead > -1) {  // until we hit end of source file
         bytesRead = in.read(buffer);
         if(bytesRead > 0) {
            out.write(buffer,0, bytesRead);
         }
      }
      in.close();
      out.close();
      if(!source.delete())
         logMessage("Could not delete file "+ source);
      return true;
   
private static voidlogException(java.lang.Throwable e)

      //e.printStackTrace();
      log.error(e);
   
private static voidlogMessage(java.lang.String message)

      log.info(message);
   
public voidmembershipChanged(java.util.Vector deadMembers, java.util.Vector newMembers, java.util.Vector allMembers)
Called when a new partition topology occurs. see HAPartition.AsynchHAMembershipListener

param
deadMembers A list of nodes that have died since the previous view
param
newMembers A list of nodes that have joined the partition since the previous view
param
allMembers A list of nodes that built the current view

      // Are there any deadMembers contained in mPushsInProcess or in mPullsInProcess.
      // If so, cancel operations for them.
      // If contained in mPushsInProcess, then we can stop waiting for the rest of the file transfer.
      // If contained in mPullsInProcess, then we can stop supplying for the rest of the file transfer.

      if (mPushsInProcess.size() > 0)
      {
         synchronized(mPushsInProcess)
         {
            Collection values = mPushsInProcess.values();
            Iterator iter = values.iterator();
            while (iter.hasNext())
            {
               FilePushOperation push = (FilePushOperation)iter.next();
               if (deadMembers.contains(push.getOriginatingNode()))
               {
                  // cancel the operation and remove the operation from mPushsInProcess
                  push.cancel();
                  iter.remove();
               }
            }
         }
      }

      if (mPullsInProcess.size() > 0)
      {
         synchronized(mPullsInProcess)
         {
            Collection values = mPullsInProcess.values();
            Iterator iter = values.iterator();
            while(iter.hasNext())
            {
               FilePullOperation pull = (FilePullOperation)iter.next();
               if (deadMembers.contains(pull.getFileChunk().getOriginatingNode()))
               {
                  // cancel the operation and remove the operation from mPullsInProcess
                  pull.cancel();
                  iter.remove();
               }
            }
         }
      }
   
public voidpull(java.io.File file, java.lang.String parentName)
Get specified file from the cluster.

param
file identifies the file to get from the cluster.
param
parentName is the parent folder name for the file on both source and destination nodes.
throws
ClusterFileTransferException

      String myNodeName = this.mPartition.getNodeName();
      ClusterNode myNodeAddress = this.mPartition.getClusterNode();
      FileOutputStream output = null;
      try
      {
         log.info("Start pull of file " + file.getName() + " from cluster.");
         ArrayList response = mPartition.callMethodOnCoordinatorNode(SERVICE_NAME,
            "remotePullOpenFile",
            new Object[]{file, myNodeName, myNodeAddress,  parentName}, new Class[]{java.io.File.class, java.lang.String.class,ClusterNode.class, java.lang.String.class},
            true);

         if (response == null || response.size() < 1)
         {
            throw new ClusterFileTransferException("Did not receive response from remote machine trying to open file '" + file + "'.  Check remote machine error log.");
         }
         
         FileContentChunk fileChunk = (FileContentChunk) response.get(0);
         if(null == fileChunk)
         {
            throw new ClusterFileTransferException("An error occured on remote machine trying to open file '" + file + "'.  Check remote machine error log.");
         }

         File tempFile = new File(ClusterFileTransfer.getServerTempDir(), file.getName());
         output = new FileOutputStream(tempFile);

         // get the remote file modification time and change our local copy to have the same time.
         long lastModification = fileChunk.lastModified();
         while (fileChunk.mByteCount > 0)
         {
            output.write(fileChunk.mChunk, 0, fileChunk.mByteCount);
            response = mPartition.callMethodOnCoordinatorNode(SERVICE_NAME,
               "remotePullReadFile",
               new Object[]{file, myNodeName}, new Class[]{java.io.File.class, java.lang.String.class},
               true);
            if (response.size() < 1)
            {
               if(! tempFile.delete())
                  throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'.  Is remote still running?  Also, we couldn't delete temp file "+ tempFile.getName());
               throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'.  Is remote still running?");
            }
            fileChunk = (FileContentChunk) response.get(0);
            if (null == fileChunk)
            {
               if( !tempFile.delete())
                  throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'.  Check remote machine error log.  Also, we couldn't delete temp file "+ tempFile.getName());
               throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'.  Check remote machine error log.");
            }
         }
         output.close();
         output = null;
         File target = new File(getParentFile(parentName), file.getName());
         if (target.exists()) {
            if(!target.delete())
               throw new ClusterFileTransferException("The destination file "+ target + " couldn't be deleted, the updated application will not be copied to this node");

         }
         tempFile.setLastModified(lastModification);
         if (!localMove(tempFile,target))
         {
            throw new ClusterFileTransferException("Could not move " + tempFile + " to " + target);
         }
         log.info("Finished cluster pull of file " + file.getName() + " to "+ target.getName());
      }
      catch(IOException e)
      {
         throw new ClusterFileTransferException(e);
      }
      catch(ClusterFileTransferException e)
      {
         throw e;
      }
      catch(Exception e)
      {
         throw new ClusterFileTransferException(e);
      }
      finally {
         if( output != null) {
            try {
               output.close();
            }
            catch(IOException e) {logException(e);}  // we are already in the middle of a throw if output isn't null.
         }
      }
   
public voidpush(java.io.File file, java.lang.String parentName, boolean leaveInTempFolder)
Send specified file to cluster.

param
file is the file to send.
param
leaveInTempFolder is true if the file should be left in the server temp folder.
throws
ClusterFileTransferException

      File target = new File(getParentFile(parentName), file.getName());

      log.info("Start push of file " + file.getName() + " to cluster.");
      // check if trying to send explored archive (cannot send subdirectories)
      if (target.isDirectory())
      {
         // let the user know why we are skipping this file and return.
         logMessage("You cannot send the contents of directories, consider archiving folder containing" + target.getName() + " instead.");
         return;
      }
      ClusterNode myNodeAddress = this.mPartition.getClusterNode();
      FileContentChunk fileChunk = new FileContentChunk(target, this.mPartition.getNodeName(), myNodeAddress);
      try
      {
         InputStream input = fileChunk.openInputFile();
         while (fileChunk.readNext(input) >= 0)
         {
            mPartition.callMethodOnCluster(SERVICE_NAME, "remotePushWriteFile", new Object[]{fileChunk, parentName}, new Class[]{fileChunk.getClass(), java.lang.String.class}, true);
         }
         // tell remote(s) to close the output file
         mPartition.callMethodOnCluster(SERVICE_NAME, "remotePushCloseFile", new Object[]{fileChunk, new Boolean(leaveInTempFolder), parentName}, new Class[]{fileChunk.getClass(), Boolean.class, java.lang.String.class}, true);
         input.close();
         log.info("Finished push of file " + file.getName() + " to cluster.");
      }
      catch(FileNotFoundException e)
      {
         throw new ClusterFileTransferException(e);
      }
      catch(IOException e)
      {
         throw new ClusterFileTransferException(e);
      }
      catch(Exception e)
      {
         throw new ClusterFileTransferException(e);
      }
   
public org.jboss.ha.framework.server.ClusterFileTransfer$FileContentChunkremotePullOpenFile(java.io.File file, java.lang.String originNodeName, org.jboss.ha.framework.interfaces.ClusterNode originNode, java.lang.String parentName)
This is remotely called by {@link #pull(File , String )} to open the file on the machine that the file is being copied from.

param
file is the file to pull.
param
originNodeName is the cluster node that is requesting the file.
param
parentName is the parent folder name for the file on both source and destination nodes.
return
FileContentChunk containing the first part of the file read after opening it.

      try
      {
         File target = new File(getParentFile(parentName), file.getName());
         FileContentChunk fileChunk = new FileContentChunk(target, originNodeName,originNode);
         FilePullOperation filePullOperation = new FilePullOperation(fileChunk);
         // save the operation for the next call to remoteReadFile
         this.mPullsInProcess.put(CompositeKey(originNodeName, file.getName()), filePullOperation);
         filePullOperation.openInputFile();
         fileChunk.readNext(filePullOperation.getInputStream());
         return fileChunk;
      } catch (IOException e)
      {
         logException(e);
      } catch (Exception e)
      {
         logException(e);
      }
      return null;
   
public org.jboss.ha.framework.server.ClusterFileTransfer$FileContentChunkremotePullReadFile(java.io.File file, java.lang.String originNodeName)
This is remotely called by {@link #pull(File, String )} to read the file on the machine that the file is being copied from.

param
file is the file to pull.
param
originNodeName is the cluster node that is requesting the file.
return
FileContentChunk containing the next part of the file read.

      try
      {
         FilePullOperation filePullOperation = (FilePullOperation) this.mPullsInProcess.get(CompositeKey(originNodeName, file.getName()));
         filePullOperation.getFileChunk().readNext(filePullOperation.getInputStream());
         if (filePullOperation.getFileChunk().mByteCount < 1)
         {
            // last call to read, so clean up
            filePullOperation.getInputStream().close();
            this.mPullsInProcess.remove(CompositeKey(originNodeName, file.getName()));
         }
         return filePullOperation.getFileChunk();
      } catch (IOException e)
      {
         logException(e);
      }
      return null;
   
public voidremotePushCloseFile(org.jboss.ha.framework.server.ClusterFileTransfer$FileContentChunk fileChunk, java.lang.Boolean leaveInTempFolder, java.lang.String parentName)
Remote method for closing the file just transmitted.

param
fileChunk
param
leaveInTempFolder is true if we should leave the file in the server temp folder

      try
      {
         FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess.remove(CompositeKey(fileChunk.getOriginatingNodeName(), fileChunk.getDestinationFile().getName()));

         if ((filePushOperation != null) && (filePushOperation.getOutputStream() != null))
         {
            filePushOperation.getOutputStream().close();
            if (!leaveInTempFolder.booleanValue())
            {
               File tempFile = new File(ClusterFileTransfer.getServerTempDir(), fileChunk.getDestinationFile().getName());
               File target = new File(getParentFile(parentName), fileChunk.getDestinationFile().getName());
               if (target.exists())
                  if(!target.delete())
                     logMessage("Could not delete target file " + target);

               tempFile.setLastModified(fileChunk.lastModified());
               if (!localMove(tempFile,target))
               {
                  logMessage("Could not move " + tempFile + " to " + target);
               }
            }
         }
      } catch (IOException e)
      {
         logException(e);
      }
   
public voidremotePushWriteFile(org.jboss.ha.framework.server.ClusterFileTransfer$FileContentChunk fileChunk, java.lang.String parentName)
Remote method for writing file a fragment at a time.

param
fileChunk

      try
      {
         String key = CompositeKey(fileChunk.getOriginatingNodeName(), fileChunk.getDestinationFile().getName());
         FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess.get(key);

         // handle first call to write
         if (filePushOperation == null)
         {
            if (fileChunk.mChunkNumber != FileContentChunk.FIRST_CHUNK)
            {
               // we joined the cluster after the file transfer started
               logMessage("Ignoring file transfer of '" + fileChunk.getDestinationFile().getName() + "' from " + fileChunk.getOriginatingNodeName() + ", we missed the start of it.");
               return;
            }
            filePushOperation = new FilePushOperation(fileChunk.getOriginatingNodeName(), fileChunk.getOriginatingNode());
            File tempFile = new File(ClusterFileTransfer.getServerTempDir(), fileChunk.getDestinationFile().getName());
            filePushOperation.openOutputFile(tempFile);
            mPushsInProcess.put(key, filePushOperation);
         }
         filePushOperation.getOutputStream().write(fileChunk.mChunk, 0, fileChunk.mByteCount);
      } catch (FileNotFoundException e)
      {
         logException(e);
      } catch (IOException e)
      {
         logException(e);
      }