FileDocCategorySizeDatePackage
ClusterFileTransfer.javaAPI DocJBoss 4.2.124402Fri Jul 13 20:52:38 BST 2007org.jboss.ha.framework.server

ClusterFileTransfer.java

/*
 * JBoss, Home of Professional Open Source.
 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
 * as indicated by the @author tags. See the copyright.txt file in the
 * distribution for a full listing of individual contributors.
 *
 * This is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 */
package org.jboss.ha.framework.server;

import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.system.server.ServerConfigLocator;
import org.jboss.logging.Logger;
import org.jboss.ha.framework.interfaces.HAPartition.AsynchHAMembershipListener;

import java.util.*;
import java.io.*;

/**
 * Handles transfering files on the cluster.  Files are sent in small chunks at a time (up to MAX_CHUNK_BUFFER_SIZE bytes per
 * Cluster call).
 *
 * @author <a href="mailto:smarlow@novell.com">Scott Marlow</a>.
 * @version $Revision: 62037 $
 */
public class ClusterFileTransfer implements AsynchHAMembershipListener {

   // Specify max file transfer buffer size that we read and write at a time.
   // This influences the number of times that we will invoke disk read/write file
   // operations versus how much memory we will consume for a file transfer.
   private static final int MAX_CHUNK_BUFFER_SIZE = 512 * 1024;

   // collection of in-progress file push operations
   private Map mPushsInProcess = Collections.synchronizedMap(new HashMap());

   // collection of in-progress file pull operations
   private Map mPullsInProcess = Collections.synchronizedMap(new HashMap());

   private HAPartition mPartition;

   private static final File TEMP_DIRECTORY = ServerConfigLocator.locate().getServerTempDir();

   // Mapping between parent folder name and target destination folder
   // the search key is the parent folder name and value is the java.io.File.
   // We don't synchronize on the mParentFolders as we assume its safe to read it.
   private Map mParentFolders = null;

   private static final String SERVICE_NAME = ClusterFileTransfer.class.getName() + "Service";

   private static final Logger log = Logger.getLogger(ClusterFileTransfer.class.getName());

   /**
    * Constructor needs the cluster partition and the mapping of server folder names to the java.io.File instance
    * representing the physical folder.
    *
    * @param partition represents the cluster.
    * @param destinationDirectoryMap is the mapping between server folder name and physical folder representation.
    */
   public ClusterFileTransfer(HAPartition partition, Map destinationDirectoryMap)
   {
      this.mPartition = partition;
      this.mPartition.registerRPCHandler(SERVICE_NAME, this);
      this.mPartition.registerMembershipListener(this);
      mParentFolders = destinationDirectoryMap;
   }

   /**
    * Get specified file from the cluster.
    *
    * @param file identifies the file to get from the cluster.
    * @param parentName is the parent folder name for the file on both source and destination nodes.
    * @throws ClusterFileTransferException
    */
   public void pull(File file, String parentName) throws ClusterFileTransferException
   {
      String myNodeName = this.mPartition.getNodeName();
      ClusterNode myNodeAddress = this.mPartition.getClusterNode();
      FileOutputStream output = null;
      try
      {
         log.info("Start pull of file " + file.getName() + " from cluster.");
         ArrayList response = mPartition.callMethodOnCoordinatorNode(SERVICE_NAME,
            "remotePullOpenFile",
            new Object[]{file, myNodeName, myNodeAddress,  parentName}, new Class[]{java.io.File.class, java.lang.String.class,ClusterNode.class, java.lang.String.class},
            true);

         if (response == null || response.size() < 1)
         {
            throw new ClusterFileTransferException("Did not receive response from remote machine trying to open file '" + file + "'.  Check remote machine error log.");
         }
         
         FileContentChunk fileChunk = (FileContentChunk) response.get(0);
         if(null == fileChunk)
         {
            throw new ClusterFileTransferException("An error occured on remote machine trying to open file '" + file + "'.  Check remote machine error log.");
         }

         File tempFile = new File(ClusterFileTransfer.getServerTempDir(), file.getName());
         output = new FileOutputStream(tempFile);

         // get the remote file modification time and change our local copy to have the same time.
         long lastModification = fileChunk.lastModified();
         while (fileChunk.mByteCount > 0)
         {
            output.write(fileChunk.mChunk, 0, fileChunk.mByteCount);
            response = mPartition.callMethodOnCoordinatorNode(SERVICE_NAME,
               "remotePullReadFile",
               new Object[]{file, myNodeName}, new Class[]{java.io.File.class, java.lang.String.class},
               true);
            if (response.size() < 1)
            {
               if(! tempFile.delete())
                  throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'.  Is remote still running?  Also, we couldn't delete temp file "+ tempFile.getName());
               throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'.  Is remote still running?");
            }
            fileChunk = (FileContentChunk) response.get(0);
            if (null == fileChunk)
            {
               if( !tempFile.delete())
                  throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'.  Check remote machine error log.  Also, we couldn't delete temp file "+ tempFile.getName());
               throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'.  Check remote machine error log.");
            }
         }
         output.close();
         output = null;
         File target = new File(getParentFile(parentName), file.getName());
         if (target.exists()) {
            if(!target.delete())
               throw new ClusterFileTransferException("The destination file "+ target + " couldn't be deleted, the updated application will not be copied to this node");

         }
         tempFile.setLastModified(lastModification);
         if (!localMove(tempFile,target))
         {
            throw new ClusterFileTransferException("Could not move " + tempFile + " to " + target);
         }
         log.info("Finished cluster pull of file " + file.getName() + " to "+ target.getName());
      }
      catch(IOException e)
      {
         throw new ClusterFileTransferException(e);
      }
      catch(ClusterFileTransferException e)
      {
         throw e;
      }
      catch(Exception e)
      {
         throw new ClusterFileTransferException(e);
      }
      finally {
         if( output != null) {
            try {
               output.close();
            }
            catch(IOException e) {logException(e);}  // we are already in the middle of a throw if output isn't null.
         }
      }
   }

   /**
    * This is remotely called by {@link #pull(File , String )} to open the file on the machine that
    * the file is being copied from.
    *
    * @param file is the file to pull.
    * @param originNodeName is the cluster node that is requesting the file.
    * @param parentName     is the parent folder name for the file on both source and destination nodes.
    * @return FileContentChunk containing the first part of the file read after opening it.
    */
   public FileContentChunk remotePullOpenFile(File file, String originNodeName, ClusterNode originNode, String parentName)
   {
      try
      {
         File target = new File(getParentFile(parentName), file.getName());
         FileContentChunk fileChunk = new FileContentChunk(target, originNodeName,originNode);
         FilePullOperation filePullOperation = new FilePullOperation(fileChunk);
         // save the operation for the next call to remoteReadFile
         this.mPullsInProcess.put(CompositeKey(originNodeName, file.getName()), filePullOperation);
         filePullOperation.openInputFile();
         fileChunk.readNext(filePullOperation.getInputStream());
         return fileChunk;
      } catch (IOException e)
      {
         logException(e);
      } catch (Exception e)
      {
         logException(e);
      }
      return null;
   }

   /**
    * This is remotely called by {@link #pull(File, String )} to read the file on the machine that the file is being
    * copied from.
    *
    * @param file is the file to pull.
    * @param originNodeName is the cluster node that is requesting the file.
    * @return FileContentChunk containing the next part of the file read.
    */
   public FileContentChunk remotePullReadFile(File file, String originNodeName)
   {
      try
      {
         FilePullOperation filePullOperation = (FilePullOperation) this.mPullsInProcess.get(CompositeKey(originNodeName, file.getName()));
         filePullOperation.getFileChunk().readNext(filePullOperation.getInputStream());
         if (filePullOperation.getFileChunk().mByteCount < 1)
         {
            // last call to read, so clean up
            filePullOperation.getInputStream().close();
            this.mPullsInProcess.remove(CompositeKey(originNodeName, file.getName()));
         }
         return filePullOperation.getFileChunk();
      } catch (IOException e)
      {
         logException(e);
      }
      return null;
   }

   /**
    * Send specified file to cluster.
    *
    * @param file is the file to send.
    * @param leaveInTempFolder is true if the file should be left in the server temp folder.
    * @throws ClusterFileTransferException
    */
   public void push(File file, String parentName, boolean leaveInTempFolder) throws ClusterFileTransferException
   {
      File target = new File(getParentFile(parentName), file.getName());

      log.info("Start push of file " + file.getName() + " to cluster.");
      // check if trying to send explored archive (cannot send subdirectories)
      if (target.isDirectory())
      {
         // let the user know why we are skipping this file and return.
         logMessage("You cannot send the contents of directories, consider archiving folder containing" + target.getName() + " instead.");
         return;
      }
      ClusterNode myNodeAddress = this.mPartition.getClusterNode();
      FileContentChunk fileChunk = new FileContentChunk(target, this.mPartition.getNodeName(), myNodeAddress);
      try
      {
         InputStream input = fileChunk.openInputFile();
         while (fileChunk.readNext(input) >= 0)
         {
            mPartition.callMethodOnCluster(SERVICE_NAME, "remotePushWriteFile", new Object[]{fileChunk, parentName}, new Class[]{fileChunk.getClass(), java.lang.String.class}, true);
         }
         // tell remote(s) to close the output file
         mPartition.callMethodOnCluster(SERVICE_NAME, "remotePushCloseFile", new Object[]{fileChunk, new Boolean(leaveInTempFolder), parentName}, new Class[]{fileChunk.getClass(), Boolean.class, java.lang.String.class}, true);
         input.close();
         log.info("Finished push of file " + file.getName() + " to cluster.");
      }
      catch(FileNotFoundException e)
      {
         throw new ClusterFileTransferException(e);
      }
      catch(IOException e)
      {
         throw new ClusterFileTransferException(e);
      }
      catch(Exception e)
      {
         throw new ClusterFileTransferException(e);
      }
   }


   /**
    * Remote method for writing file a fragment at a time.
    *
    * @param fileChunk
    */
   public void remotePushWriteFile(FileContentChunk fileChunk, String parentName)
   {
      try
      {
         String key = CompositeKey(fileChunk.getOriginatingNodeName(), fileChunk.getDestinationFile().getName());
         FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess.get(key);

         // handle first call to write
         if (filePushOperation == null)
         {
            if (fileChunk.mChunkNumber != FileContentChunk.FIRST_CHUNK)
            {
               // we joined the cluster after the file transfer started
               logMessage("Ignoring file transfer of '" + fileChunk.getDestinationFile().getName() + "' from " + fileChunk.getOriginatingNodeName() + ", we missed the start of it.");
               return;
            }
            filePushOperation = new FilePushOperation(fileChunk.getOriginatingNodeName(), fileChunk.getOriginatingNode());
            File tempFile = new File(ClusterFileTransfer.getServerTempDir(), fileChunk.getDestinationFile().getName());
            filePushOperation.openOutputFile(tempFile);
            mPushsInProcess.put(key, filePushOperation);
         }
         filePushOperation.getOutputStream().write(fileChunk.mChunk, 0, fileChunk.mByteCount);
      } catch (FileNotFoundException e)
      {
         logException(e);
      } catch (IOException e)
      {
         logException(e);
      }
   }

   /**
    * Remote method for closing the file just transmitted.
    *
    * @param fileChunk
    * @param leaveInTempFolder is true if we should leave the file in the server temp folder
    */
   public void remotePushCloseFile(FileContentChunk fileChunk, Boolean leaveInTempFolder, String parentName)
   {
      try
      {
         FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess.remove(CompositeKey(fileChunk.getOriginatingNodeName(), fileChunk.getDestinationFile().getName()));

         if ((filePushOperation != null) && (filePushOperation.getOutputStream() != null))
         {
            filePushOperation.getOutputStream().close();
            if (!leaveInTempFolder.booleanValue())
            {
               File tempFile = new File(ClusterFileTransfer.getServerTempDir(), fileChunk.getDestinationFile().getName());
               File target = new File(getParentFile(parentName), fileChunk.getDestinationFile().getName());
               if (target.exists())
                  if(!target.delete())
                     logMessage("Could not delete target file " + target);

               tempFile.setLastModified(fileChunk.lastModified());
               if (!localMove(tempFile,target))
               {
                  logMessage("Could not move " + tempFile + " to " + target);
               }
            }
         }
      } catch (IOException e)
      {
         logException(e);
      }
   }

   /** Called when a new partition topology occurs. see HAPartition.AsynchHAMembershipListener
    *
    * @param deadMembers A list of nodes that have died since the previous view
    * @param newMembers A list of nodes that have joined the partition since the previous view
    * @param allMembers A list of nodes that built the current view
    */
   public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
   {
      // Are there any deadMembers contained in mPushsInProcess or in mPullsInProcess.
      // If so, cancel operations for them.
      // If contained in mPushsInProcess, then we can stop waiting for the rest of the file transfer.
      // If contained in mPullsInProcess, then we can stop supplying for the rest of the file transfer.

      if (mPushsInProcess.size() > 0)
      {
         synchronized(mPushsInProcess)
         {
            Collection values = mPushsInProcess.values();
            Iterator iter = values.iterator();
            while (iter.hasNext())
            {
               FilePushOperation push = (FilePushOperation)iter.next();
               if (deadMembers.contains(push.getOriginatingNode()))
               {
                  // cancel the operation and remove the operation from mPushsInProcess
                  push.cancel();
                  iter.remove();
               }
            }
         }
      }

      if (mPullsInProcess.size() > 0)
      {
         synchronized(mPullsInProcess)
         {
            Collection values = mPullsInProcess.values();
            Iterator iter = values.iterator();
            while(iter.hasNext())
            {
               FilePullOperation pull = (FilePullOperation)iter.next();
               if (deadMembers.contains(pull.getFileChunk().getOriginatingNode()))
               {
                  // cancel the operation and remove the operation from mPullsInProcess
                  pull.cancel();
                  iter.remove();
               }
            }
         }
      }
   }

   private static File getServerTempDir()
   {
      return TEMP_DIRECTORY;
   }

   private File getParentFile(String parentName)
   {
      return (File) mParentFolders.get(parentName);
   }

   private String CompositeKey(String originNodeName, String fileName)
   {
      return originNodeName + "#" + fileName;
   }

   private static void logMessage(String message)
   {
      log.info(message);
   }

   private static void logException(Throwable e)
   {
      //e.printStackTrace();
      log.error(e);
   }


   /**
    * Represents file push operation.
    */
   private static class FilePushOperation {


      public FilePushOperation(String originNodeName, ClusterNode originNode)
      {
         mOriginNodeName =originNodeName;
         mOriginNode = originNode;
      }

      public void openOutputFile(File file) throws FileNotFoundException
      {
         mOutput = new FileOutputStream(file);
         mOutputFile = file;
      }

      /**
       * Cancel the file push operation.  To be called locally on each machine that is
       * receiving the file.
       */
      public void cancel()
      {
         ClusterFileTransfer.logMessage("Canceling receive of file " + mOutputFile + " as remote server "+mOriginNodeName+" left the cluster.  Partial results will be deleted.");
         try
         {
            // close the output stream and delete the file.
            mOutput.close();
            if(!mOutputFile.delete())
               logMessage("Could not delete output file " + mOutputFile);
         }
         catch(IOException e) { logException(e); }
      }

      /**
       * Get the IPAddress of the cluster node that is pushing file to the cluster.
       * @return IPAddress
       */
      public ClusterNode getOriginatingNode()
      {
         return mOriginNode;
      }

      public OutputStream getOutputStream()
      {
         return mOutput;
      }

      private OutputStream mOutput;
      private String mOriginNodeName;
      private ClusterNode mOriginNode;
      private File mOutputFile;
   }

   /**
    * Represents file pull operation.
    */
   private static class FilePullOperation {
      public FilePullOperation(FileContentChunk fileChunk)
      {
         mFileChunk = fileChunk;
      }

      public void openInputFile() throws FileNotFoundException
      {
         mInput = mFileChunk.openInputFile();
      }

      public InputStream getInputStream()
      {
         return mInput;
      }

      /**
       * Cancel the file pull operation.  To be called locally on the machine that is supplying the file.
       */
      public void cancel()
      {
         logMessage("Canceling send of file " + mFileChunk.getDestinationFile() + " as remote server "+mFileChunk.getOriginatingNodeName()+" left the cluster.");
         try
         {
            mInput.close();
         }
         catch(IOException e) { logException(e); }
      }

      public FileContentChunk getFileChunk()
      {
         return mFileChunk;
      }

      private FileContentChunk mFileChunk;
      private InputStream mInput;
   }

   /**
    * For representing filetransfer state on the wire.
    * The inputStream or OutputStream is expected to be maintained by the sender/receiver.
    */
   private static class FileContentChunk implements Serializable {

      public FileContentChunk(File file, String originNodeName, ClusterNode originNode)
      {
         this.mDestinationFile = file;
         this.mLastModified = file.lastModified();
         this.mOriginNode = originNode;
         this.mOriginNodeName = originNodeName;
         mChunkNumber = 0;
         long size = file.length();
         if (size > MAX_CHUNK_BUFFER_SIZE)
            size = MAX_CHUNK_BUFFER_SIZE;
         else if (size <= 0)
            size = 1;
         mChunk = new byte[(int) size];     // set amount transferred at a time
         mByteCount = 0;
      }

      /**
       * Get the name of the cluster node that started the file transfer operation
       *
       * @return node name
       */
      public String getOriginatingNodeName()
      {
         return this.mOriginNodeName;
      }

      /**
       * Get the address of the cluster node that started the file transfer operation.
       * @return ClusterNode
       */
      public ClusterNode getOriginatingNode()
      {
         return mOriginNode;
      }

      public File getDestinationFile()
      {
         return this.mDestinationFile;
      }

      /**
       * Open input file
       *
       * @throws FileNotFoundException
       */
      public InputStream openInputFile() throws FileNotFoundException
      {
         return new FileInputStream(this.mDestinationFile);
      }

      /**
       * Open output file
       *
       * @return
       * @throws FileNotFoundException
       */
      public OutputStream openOutputFile() throws FileNotFoundException
      {
         File lFile = new File(ClusterFileTransfer.getServerTempDir(), this.mDestinationFile.getName());
         FileOutputStream output = new FileOutputStream(lFile);
         return output;
      }

      /**
       * @return number of bytes read
       * @throws IOException
       */
      public int readNext(InputStream input) throws IOException
      {
         this.mChunkNumber++;
         this.mByteCount = input.read(this.mChunk);
         return this.mByteCount;
      }

      public long lastModified()
      {
         return mLastModified;
      }

      static final long serialVersionUID = 3546447481674749363L;
      private File mDestinationFile;
      private long mLastModified;
      private String mOriginNodeName;
      private ClusterNode mOriginNode;
      private int mChunkNumber;
      private static final int FIRST_CHUNK = 1;
      private byte[] mChunk;
      private int mByteCount;
   }

   public static boolean localMove(File source, File destination) throws FileNotFoundException, IOException {
      if(source.renameTo(destination))  // if we can simply rename the file
         return true;                   // return success
      // otherwise, copy source to destination
      OutputStream out = new FileOutputStream(destination);
      InputStream in = new FileInputStream(source);
      byte buffer[] = new byte[32*1024];
      int bytesRead = 0;
      while(bytesRead > -1) {  // until we hit end of source file
         bytesRead = in.read(buffer);
         if(bytesRead > 0) {
            out.write(buffer,0, bytesRead);
         }
      }
      in.close();
      out.close();
      if(!source.delete())
         logMessage("Could not delete file "+ source);
      return true;
   }

   /**
    * Exception wrapper class
    */
   public static class ClusterFileTransferException extends Exception
   {
      public ClusterFileTransferException(String message)
      {
         super(message);
      }

      public ClusterFileTransferException(String message, Throwable cause)
      {
         super(message, cause);
      }

      public ClusterFileTransferException(Throwable cause)
      {
         super(cause);
      }
   }
}