ClusterFileTransferpublic class ClusterFileTransfer extends Object implements org.jboss.ha.framework.interfaces.HAPartition.AsynchHAMembershipListenerHandles transfering files on the cluster. Files are sent in small chunks at a time (up to MAX_CHUNK_BUFFER_SIZE bytes per
Cluster call). |
Fields Summary |
---|
private static final int | MAX_CHUNK_BUFFER_SIZE | private Map | mPushsInProcess | private Map | mPullsInProcess | private org.jboss.ha.framework.interfaces.HAPartition | mPartition | private static final File | TEMP_DIRECTORY | private Map | mParentFolders | private static final String | SERVICE_NAME | private static final Logger | log |
Constructors Summary |
---|
public ClusterFileTransfer(org.jboss.ha.framework.interfaces.HAPartition partition, Map destinationDirectoryMap)Constructor needs the cluster partition and the mapping of server folder names to the java.io.File instance
representing the physical folder.
this.mPartition = partition;
this.mPartition.registerRPCHandler(SERVICE_NAME, this);
this.mPartition.registerMembershipListener(this);
mParentFolders = destinationDirectoryMap;
|
Methods Summary |
---|
private java.lang.String | CompositeKey(java.lang.String originNodeName, java.lang.String fileName)
return originNodeName + "#" + fileName;
| private java.io.File | getParentFile(java.lang.String parentName)
return (File) mParentFolders.get(parentName);
| private static java.io.File | getServerTempDir()
return TEMP_DIRECTORY;
| public static boolean | localMove(java.io.File source, java.io.File destination)
if(source.renameTo(destination)) // if we can simply rename the file
return true; // return success
// otherwise, copy source to destination
OutputStream out = new FileOutputStream(destination);
InputStream in = new FileInputStream(source);
byte buffer[] = new byte[32*1024];
int bytesRead = 0;
while(bytesRead > -1) { // until we hit end of source file
bytesRead = in.read(buffer);
if(bytesRead > 0) {
out.write(buffer,0, bytesRead);
}
}
in.close();
out.close();
if(!source.delete())
logMessage("Could not delete file "+ source);
return true;
| private static void | logException(java.lang.Throwable e)
//e.printStackTrace();
log.error(e);
| private static void | logMessage(java.lang.String message)
log.info(message);
| public void | membershipChanged(java.util.Vector deadMembers, java.util.Vector newMembers, java.util.Vector allMembers)Called when a new partition topology occurs. see HAPartition.AsynchHAMembershipListener
// Are there any deadMembers contained in mPushsInProcess or in mPullsInProcess.
// If so, cancel operations for them.
// If contained in mPushsInProcess, then we can stop waiting for the rest of the file transfer.
// If contained in mPullsInProcess, then we can stop supplying for the rest of the file transfer.
if (mPushsInProcess.size() > 0)
{
synchronized(mPushsInProcess)
{
Collection values = mPushsInProcess.values();
Iterator iter = values.iterator();
while (iter.hasNext())
{
FilePushOperation push = (FilePushOperation)iter.next();
if (deadMembers.contains(push.getOriginatingNode()))
{
// cancel the operation and remove the operation from mPushsInProcess
push.cancel();
iter.remove();
}
}
}
}
if (mPullsInProcess.size() > 0)
{
synchronized(mPullsInProcess)
{
Collection values = mPullsInProcess.values();
Iterator iter = values.iterator();
while(iter.hasNext())
{
FilePullOperation pull = (FilePullOperation)iter.next();
if (deadMembers.contains(pull.getFileChunk().getOriginatingNode()))
{
// cancel the operation and remove the operation from mPullsInProcess
pull.cancel();
iter.remove();
}
}
}
}
| public void | pull(java.io.File file, java.lang.String parentName)Get specified file from the cluster.
String myNodeName = this.mPartition.getNodeName();
ClusterNode myNodeAddress = this.mPartition.getClusterNode();
FileOutputStream output = null;
try
{
log.info("Start pull of file " + file.getName() + " from cluster.");
ArrayList response = mPartition.callMethodOnCoordinatorNode(SERVICE_NAME,
"remotePullOpenFile",
new Object[]{file, myNodeName, myNodeAddress, parentName}, new Class[]{java.io.File.class, java.lang.String.class,ClusterNode.class, java.lang.String.class},
true);
if (response == null || response.size() < 1)
{
throw new ClusterFileTransferException("Did not receive response from remote machine trying to open file '" + file + "'. Check remote machine error log.");
}
FileContentChunk fileChunk = (FileContentChunk) response.get(0);
if(null == fileChunk)
{
throw new ClusterFileTransferException("An error occured on remote machine trying to open file '" + file + "'. Check remote machine error log.");
}
File tempFile = new File(ClusterFileTransfer.getServerTempDir(), file.getName());
output = new FileOutputStream(tempFile);
// get the remote file modification time and change our local copy to have the same time.
long lastModification = fileChunk.lastModified();
while (fileChunk.mByteCount > 0)
{
output.write(fileChunk.mChunk, 0, fileChunk.mByteCount);
response = mPartition.callMethodOnCoordinatorNode(SERVICE_NAME,
"remotePullReadFile",
new Object[]{file, myNodeName}, new Class[]{java.io.File.class, java.lang.String.class},
true);
if (response.size() < 1)
{
if(! tempFile.delete())
throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Is remote still running? Also, we couldn't delete temp file "+ tempFile.getName());
throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Is remote still running?");
}
fileChunk = (FileContentChunk) response.get(0);
if (null == fileChunk)
{
if( !tempFile.delete())
throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Check remote machine error log. Also, we couldn't delete temp file "+ tempFile.getName());
throw new ClusterFileTransferException("An error occured on remote machine trying to read file '" + file + "'. Check remote machine error log.");
}
}
output.close();
output = null;
File target = new File(getParentFile(parentName), file.getName());
if (target.exists()) {
if(!target.delete())
throw new ClusterFileTransferException("The destination file "+ target + " couldn't be deleted, the updated application will not be copied to this node");
}
tempFile.setLastModified(lastModification);
if (!localMove(tempFile,target))
{
throw new ClusterFileTransferException("Could not move " + tempFile + " to " + target);
}
log.info("Finished cluster pull of file " + file.getName() + " to "+ target.getName());
}
catch(IOException e)
{
throw new ClusterFileTransferException(e);
}
catch(ClusterFileTransferException e)
{
throw e;
}
catch(Exception e)
{
throw new ClusterFileTransferException(e);
}
finally {
if( output != null) {
try {
output.close();
}
catch(IOException e) {logException(e);} // we are already in the middle of a throw if output isn't null.
}
}
| public void | push(java.io.File file, java.lang.String parentName, boolean leaveInTempFolder)Send specified file to cluster.
File target = new File(getParentFile(parentName), file.getName());
log.info("Start push of file " + file.getName() + " to cluster.");
// check if trying to send explored archive (cannot send subdirectories)
if (target.isDirectory())
{
// let the user know why we are skipping this file and return.
logMessage("You cannot send the contents of directories, consider archiving folder containing" + target.getName() + " instead.");
return;
}
ClusterNode myNodeAddress = this.mPartition.getClusterNode();
FileContentChunk fileChunk = new FileContentChunk(target, this.mPartition.getNodeName(), myNodeAddress);
try
{
InputStream input = fileChunk.openInputFile();
while (fileChunk.readNext(input) >= 0)
{
mPartition.callMethodOnCluster(SERVICE_NAME, "remotePushWriteFile", new Object[]{fileChunk, parentName}, new Class[]{fileChunk.getClass(), java.lang.String.class}, true);
}
// tell remote(s) to close the output file
mPartition.callMethodOnCluster(SERVICE_NAME, "remotePushCloseFile", new Object[]{fileChunk, new Boolean(leaveInTempFolder), parentName}, new Class[]{fileChunk.getClass(), Boolean.class, java.lang.String.class}, true);
input.close();
log.info("Finished push of file " + file.getName() + " to cluster.");
}
catch(FileNotFoundException e)
{
throw new ClusterFileTransferException(e);
}
catch(IOException e)
{
throw new ClusterFileTransferException(e);
}
catch(Exception e)
{
throw new ClusterFileTransferException(e);
}
| public org.jboss.ha.framework.server.ClusterFileTransfer$FileContentChunk | remotePullOpenFile(java.io.File file, java.lang.String originNodeName, org.jboss.ha.framework.interfaces.ClusterNode originNode, java.lang.String parentName)This is remotely called by {@link #pull(File , String )} to open the file on the machine that
the file is being copied from.
try
{
File target = new File(getParentFile(parentName), file.getName());
FileContentChunk fileChunk = new FileContentChunk(target, originNodeName,originNode);
FilePullOperation filePullOperation = new FilePullOperation(fileChunk);
// save the operation for the next call to remoteReadFile
this.mPullsInProcess.put(CompositeKey(originNodeName, file.getName()), filePullOperation);
filePullOperation.openInputFile();
fileChunk.readNext(filePullOperation.getInputStream());
return fileChunk;
} catch (IOException e)
{
logException(e);
} catch (Exception e)
{
logException(e);
}
return null;
| public org.jboss.ha.framework.server.ClusterFileTransfer$FileContentChunk | remotePullReadFile(java.io.File file, java.lang.String originNodeName)This is remotely called by {@link #pull(File, String )} to read the file on the machine that the file is being
copied from.
try
{
FilePullOperation filePullOperation = (FilePullOperation) this.mPullsInProcess.get(CompositeKey(originNodeName, file.getName()));
filePullOperation.getFileChunk().readNext(filePullOperation.getInputStream());
if (filePullOperation.getFileChunk().mByteCount < 1)
{
// last call to read, so clean up
filePullOperation.getInputStream().close();
this.mPullsInProcess.remove(CompositeKey(originNodeName, file.getName()));
}
return filePullOperation.getFileChunk();
} catch (IOException e)
{
logException(e);
}
return null;
| public void | remotePushCloseFile(org.jboss.ha.framework.server.ClusterFileTransfer$FileContentChunk fileChunk, java.lang.Boolean leaveInTempFolder, java.lang.String parentName)Remote method for closing the file just transmitted.
try
{
FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess.remove(CompositeKey(fileChunk.getOriginatingNodeName(), fileChunk.getDestinationFile().getName()));
if ((filePushOperation != null) && (filePushOperation.getOutputStream() != null))
{
filePushOperation.getOutputStream().close();
if (!leaveInTempFolder.booleanValue())
{
File tempFile = new File(ClusterFileTransfer.getServerTempDir(), fileChunk.getDestinationFile().getName());
File target = new File(getParentFile(parentName), fileChunk.getDestinationFile().getName());
if (target.exists())
if(!target.delete())
logMessage("Could not delete target file " + target);
tempFile.setLastModified(fileChunk.lastModified());
if (!localMove(tempFile,target))
{
logMessage("Could not move " + tempFile + " to " + target);
}
}
}
} catch (IOException e)
{
logException(e);
}
| public void | remotePushWriteFile(org.jboss.ha.framework.server.ClusterFileTransfer$FileContentChunk fileChunk, java.lang.String parentName)Remote method for writing file a fragment at a time.
try
{
String key = CompositeKey(fileChunk.getOriginatingNodeName(), fileChunk.getDestinationFile().getName());
FilePushOperation filePushOperation = (FilePushOperation) mPushsInProcess.get(key);
// handle first call to write
if (filePushOperation == null)
{
if (fileChunk.mChunkNumber != FileContentChunk.FIRST_CHUNK)
{
// we joined the cluster after the file transfer started
logMessage("Ignoring file transfer of '" + fileChunk.getDestinationFile().getName() + "' from " + fileChunk.getOriginatingNodeName() + ", we missed the start of it.");
return;
}
filePushOperation = new FilePushOperation(fileChunk.getOriginatingNodeName(), fileChunk.getOriginatingNode());
File tempFile = new File(ClusterFileTransfer.getServerTempDir(), fileChunk.getDestinationFile().getName());
filePushOperation.openOutputFile(tempFile);
mPushsInProcess.put(key, filePushOperation);
}
filePushOperation.getOutputStream().write(fileChunk.mChunk, 0, fileChunk.mByteCount);
} catch (FileNotFoundException e)
{
logException(e);
} catch (IOException e)
{
logException(e);
}
|
|