FileDocCategorySizeDatePackage
CacheFileManagerImpl.javaAPI DocAzureus 3.0.3.422253Mon Aug 27 16:05:40 BST 2007com.aelitis.azureus.core.diskmanager.cache.impl

CacheFileManagerImpl

public class CacheFileManagerImpl extends Object implements CacheFileManager, AEDiagnosticsEvidenceGenerator
author
parg

Fields Summary
private static final LogIDs
LOGID
public static final boolean
DEBUG
public static final int
CACHE_CLEANER_TICKS
public static final int
STATS_UPDATE_FREQUENCY
public static final long
DIRTY_CACHE_WRITE_MAX_AGE
protected boolean
cache_enabled
protected boolean
cache_read_enabled
protected boolean
cache_write_enabled
protected long
cache_size
protected long
cache_files_not_smaller_than
protected long
cache_minimum_free_size
protected long
cache_space_free
private long
cache_file_id_next
protected FMFileManager
file_manager
protected WeakHashMap
cache_files
protected WeakHashMap
updated_cache_files
protected LinkedHashMap
cache_entries
protected CacheFileManagerStatsImpl
stats
protected Map
torrent_to_cache_file_map
protected long
cache_bytes_written
protected long
cache_bytes_read
protected long
file_bytes_written
protected long
file_bytes_read
protected long
cache_read_count
protected long
cache_write_count
protected long
file_read_count
protected long
file_write_count
protected AEMonitor
this_mon
private long
cleaner_ticks
Constructors Summary
public CacheFileManagerImpl()

	
	
	
	
	
	
		AEDiagnostics.addEvidenceGenerator( this );
		
		file_manager	= FMFileManagerFactory.getSingleton();
		
		boolean	enabled	= COConfigurationManager.getBooleanParameter( "diskmanager.perf.cache.enable" );

		boolean	enable_read 	= COConfigurationManager.getBooleanParameter( "diskmanager.perf.cache.enable.read" );
		
		boolean	enable_write	= COConfigurationManager.getBooleanParameter( "diskmanager.perf.cache.enable.write" );
		
			// units are MB
		
		int		size			= 1024*1024*COConfigurationManager.getIntParameter( "diskmanager.perf.cache.size" );
		
			// units are KB
		
		int		not_smaller_than	= 1024*COConfigurationManager.getIntParameter( "notsmallerthan" );
		
		if ( size <= 0 ){
		
			Debug.out( "Invalid cache size parameter (" + size + "), caching disabled" );
			
			enabled	= false;
		}
		
		initialise( enabled, enable_read, enable_write, size, not_smaller_than );
	
Methods Summary
protected voidaddCacheSpace(CacheEntry new_entry)

		try{
			this_mon.enter();
			
			cache_space_free	-= new_entry.getLength();
			
				// 	System.out.println( "Total cache space = " + cache_space_free );
		
			cache_entries.put( new_entry, new_entry );
			
			if ( DEBUG ){
				
				CacheFileWithCache	file	= new_entry.getFile();
								
				long	total_cache_size	= 0;
				
				int		my_count = 0;

				Iterator it = cache_entries.keySet().iterator();
				
				while( it.hasNext()){
					
					CacheEntry	entry = (CacheEntry)it.next();
					
					total_cache_size	+= entry.getLength();
					
					if ( entry.getFile() == file ){
						
						my_count++;
					}
				}
			
				if ( my_count != file.cache.size()){
					
					Debug.out( "Cache inconsistency: my count = " + my_count + ", file = " + file.cache.size());
					
					throw( new CacheFileManagerException( null, "Cache inconsistency: counts differ"));
					
				}else{
					
					//System.out.println( "Cache: file_count = " + my_count );
				}
				
				if ( total_cache_size != cache_size - cache_space_free ){
					
					Debug.out( "Cache inconsistency: used_size = " + total_cache_size + ", free = " + cache_space_free + ", size = " + cache_size );
					
					throw( new CacheFileManagerException( null, "Cache inconsistency: sizes differ"));
					
				}else{
					
					//System.out.println( "Cache: usage = " + total_cache_size );
				}
			}
		}finally{
			
			this_mon.exit();
		}
	
protected CacheEntryallocateCacheSpace(int entry_type, CacheFileWithCache file, DirectByteBuffer buffer, long file_position, int length)
allocates space but does NOT add it to the cache list due to synchronization issues. Basically the caller mustn't hold their monitor when calling allocate, as a flush may result in one or more other files being flushed which results in their monitor being taken, and we've got an A->B and B->A classic deadlock situation. However, we must keep the file's cache and our cache in step. It is not acceptable to have an entry inserted into our records but not in the file's as this then screws up the flush algorithm (which assumes that if it finds an entry in our list, a flush of that file is guaranteed to release space). Therefore we add the cache entry in addCacheSpace so that the caller can safely do this while synchronised firstly on its monitor and then we can sync on our. Hence we only ever get A->B monitor grabs which won't deadlock

param
file
param
buffer
param
file_position
param
length
return
throws
CacheFileManagerException

		boolean	ok 	= false;
		boolean	log	= false;		
		
		while( !ok ){
			
				// musn't invoke synchronised CacheFile methods while holding manager lock as this
				// can cause deadlocks (as CacheFile calls manager methods with locks)
			
			CacheEntry	oldest_entry	= null;
			
			try{
				this_mon.enter();
			
				if ( length < cache_space_free || cache_space_free == cache_size ){
				
					ok	= true;
					
				}else{
					
					oldest_entry = (CacheEntry)cache_entries.keySet().iterator().next();
				}
			}finally{
				
				this_mon.exit();
			}
			
			if ( !ok ){
				
				log	= true;
				
				long	old_free	= cache_space_free;
			
				CacheFileWithCache	oldest_file = oldest_entry.getFile();
				
				try{
					
					oldest_file.flushCache( oldest_entry.getFilePosition(), true, cache_minimum_free_size );
					
				}catch( CacheFileManagerException e ){
					
						// if the flush failed on a file other than this one then we don't report the error here,
						// rather we tag the existing file as failed so that when it is next accessed the error
						// will be reported
					
					if ( oldest_file != file ){
												
						oldest_file.setPendingException( e );
						
					}else{
						
						throw( e );
					}
				}
				
				long	flushed = cache_space_free - old_free;
				
				if (Logger.isEnabled()) {
					TOTorrentFile tf = file.getTorrentFile();
					TOTorrent torrent = tf == null ? null : tf.getTorrent();
					Logger.log(new LogEvent(torrent, LOGID,
							"DiskCache: cache full, flushed " + flushed + " from "
									+ oldest_file.getName()));
				}
				
				if ( flushed == 0 ){
				
					try{
						this_mon.enter();
						
						if (	cache_entries.size() > 0 &&
								(CacheEntry)cache_entries.keySet().iterator().next() == oldest_entry ){
							
								// hmm, something wrong with cache as the flush should have got rid
								// of at least the oldest entry
							
							throw( new CacheFileManagerException( null, "Cache inconsistent: 0 flushed"));
						}
					}finally{
						
						this_mon.exit();
					}
				}
			}
		}
					
		CacheEntry	entry = new CacheEntry( entry_type, file, buffer, file_position, length );
			
		if (log && Logger.isEnabled()) {
			TOTorrentFile tf = file.getTorrentFile();
			TOTorrent torrent = tf == null ? null : tf.getTorrent();

			Logger.log(new LogEvent(torrent, LOGID, "DiskCache: cr="
					+ cache_bytes_read + ",cw=" + cache_bytes_written + ",fr="
					+ file_bytes_read + ",fw=" + file_bytes_written));
		}
			
		return( entry );
	
protected voidcacheBytesRead(int num)

		try{
			this_mon.enter();
			
			cache_bytes_read	+= num;
			
			cache_read_count++;
			
		}finally{
			
			this_mon.exit();
		}
	
protected voidcacheBytesWritten(long num)

		try{
			this_mon.enter();
			
			cache_bytes_written	+= num;
			
			cache_write_count++;
			
		}finally{
			
			this_mon.exit();
		}
	
protected voidcacheEntryUsed(CacheEntry entry)

		try{
			this_mon.enter();
		
				// note that the "get" operation update the MRU in cache_entries
			
			if ( cache_entries.get( entry ) == null ){
				
				Debug.out( "Cache inconsistency: entry missing on usage" );
				
				throw( new CacheFileManagerException( null, "Cache inconsistency: entry missing on usage"));
				
			}else{
				
				entry.used();
			}
		}finally{
			
			this_mon.exit();
		}
	
protected voidcacheStatsAndCleaner()

			
		SimpleTimer.addPeriodicEvent(
				"CacehFile:stats",
				STATS_UPDATE_FREQUENCY,
        new TimerEventPerformer() {
          public void perform( TimerEvent ev ) {
      			
      			stats.update();      			
      			
      			// System.out.println( "cache file count = " + cache_files.size());
      								
      			Iterator	cf_it = cache_files.keySet().iterator();
      			
      			while(cf_it.hasNext()){
      				
      				((CacheFileWithCache)cf_it.next()).updateStats();
      			}
      			
      			if ( --cleaner_ticks == 0 ){
      				
      				cleaner_ticks	= CACHE_CLEANER_TICKS;
      				
      				final Set	dirty_files	= new HashSet();
      	
      				final long	oldest	=SystemTime.getCurrentTime() - DIRTY_CACHE_WRITE_MAX_AGE;
      				
      				try{
      					this_mon.enter();
      			
      					if ( updated_cache_files != null ){
      						
      						cache_files	= updated_cache_files;
      							
      						updated_cache_files	= null;
      					}

      					if ( cache_entries.size() > 0 ){
      						
      						Iterator it = cache_entries.keySet().iterator();
      						
      						while( it.hasNext()){
      							
      							CacheEntry	entry = (CacheEntry)it.next();
      								
      							// System.out.println( "oldest entry = " + ( now - entry.getLastUsed()));
      							
      							if ( entry.isDirty()){
      								
      								dirty_files.add( entry.getFile());
      							}
      						}
      					}
      					
      					// System.out.println( "cache file = " + cache_files.size() + ", torrent map = " + torrent_to_cache_file_map.size());
      					
      				}finally{
      					
      					this_mon.exit();
      				}
      				
      				Iterator	it = dirty_files.iterator();
      				
      				while( it.hasNext()){
      					
      					CacheFileWithCache	file = (CacheFileWithCache)it.next();

      					try{
      						
      						TOTorrentFile	tf = file.getTorrentFile();
      						
      						long	min_flush_size	= -1;
      						
      						if ( tf != null ){
      							
      							min_flush_size	= tf.getTorrent().getPieceLength();
      							
      						}
      						
      						file.flushOldDirtyData( oldest, min_flush_size );
      						
      					}catch( CacheFileManagerException e ){
      						
      						file.setPendingException( e );
      						
      							// if this fails then the error should reoccur on a "proper"
      							// flush later and be reported
      						
      						Debug.printStackTrace( e );
      						
      					}catch( Throwable e ){
      						
      						Debug.printStackTrace( e );
      					}
      				}
      			}
      			
          }
        }
     );
		
	
protected voidcloseFile(CacheFileWithCache file)

		TOTorrentFile	tf = file.getTorrentFile();
		
		if ( tf != null && torrent_to_cache_file_map.get( tf ) != null ){

			try{
				this_mon.enter();
						
				Map	new_map = new HashMap( torrent_to_cache_file_map );
				
				new_map.remove( tf );
	
				torrent_to_cache_file_map	= new_map;
				
			}finally{
				
				this_mon.exit();
			}
		}
	
public CacheFilecreateFile(CacheFileOwner owner, java.io.File file, int type)

		final long	my_id;
		
			// we differentiate the 
		try{
			this_mon.enter();
			
			my_id = cache_file_id_next++;
			
		}finally{
			
			this_mon.exit();
		}
		
		try{
			FMFile	fm_file	= 
				file_manager.createFile(
					new FMFileOwner()
					{
						public String
						getName()
						{
							return( owner.getCacheFileOwnerName() + "[" + my_id + "]" );
						}
						public TOTorrentFile
						getTorrentFile()
						{
							return( owner.getCacheFileTorrentFile());
						}
						public File
						getControlFile(
							String	name )
						{
							return( owner.getCacheFileControlFile( name ));
						}
					}, file,
					type==CacheFile.CT_LINEAR?FMFile.FT_LINEAR:FMFile.FT_COMPACT );
				
			TOTorrentFile	tf = owner.getCacheFileTorrentFile();
			
			CacheFile	cf;
			
			if (( tf != null && tf.getLength() < cache_files_not_smaller_than  ) || !cache_enabled || owner.forceNoCache()){ 
				
				cf = new CacheFileWithoutCache( this, fm_file, tf );
				
			}else{
				
				cf = new CacheFileWithCache( this, fm_file, tf );
			
				try{
					this_mon.enter();
	
					if ( updated_cache_files == null ){
						
						updated_cache_files = new WeakHashMap( cache_files );
					}
						// copy on write so readers don't need to synchronize or copy
					
					updated_cache_files.put( cf, null );
										
					if ( tf != null ){
		
									
						Map	new_map = new HashMap( torrent_to_cache_file_map );
								
						new_map.put( tf, cf );
				
						torrent_to_cache_file_map	= new_map;
					}	
				}finally{
					
					this_mon.exit();
				}
			}
			
			return( cf );
			
		}catch( FMFileManagerException e ){
			
			rethrow( null, e );
			
			return( null );
		}
	
protected voidfileBytesRead(int num)

		try{
			this_mon.enter();
			
			file_bytes_read	+= num;
			
			file_read_count++;
		}finally{
			
			this_mon.exit();
		}
	
protected voidfileBytesWritten(long num)

		try{
			this_mon.enter();
			
			file_bytes_written	+= num;
			
			file_write_count++;
			
		}finally{
			
			this_mon.exit();
		}
	
public voidgenerate(IndentWriter writer)

		writer.println( "Cache Manager" );
			
		try{
			writer.indent();
			
			Iterator it;
			
				// grab a copy to avoid potential deadlock as we never take the manager monitor
				// and then the file's own monitor, always the other way around
			
			try{
				this_mon.enter();

				it = new ArrayList( cache_entries.keySet()).iterator();

			}finally{
				
				this_mon.exit();
			}
			
			writer.println( "Entries = " + cache_entries.size());
							
			Set	files = new HashSet();
			
			while( it.hasNext()){
				
				CacheEntry	entry = (CacheEntry)it.next();
				
				CacheFileWithCache file = entry.getFile();
				
				if( !files.contains( file )){
					
					files.add( file );
					
					TOTorrentFile torrentFile = file.getTorrentFile();
					String fileLength = "";
					try {
						fileLength = "" + file.getLength();
					} catch (Exception e) {
						if (torrentFile != null)
							fileLength = "" + torrentFile.getLength();
					}
					String	hash = "<unknown>";
					
					try{
						if (torrentFile != null)
							hash = ByteFormatter.encodeString( torrentFile.getTorrent().getHash());
						
					}catch( Throwable e ){
					}
					
					String name = file.getName();

					writer.println("File: " + Debug.secretFileName(name) + ", size "
							+ fileLength + ", torrent " + hash + ", access = "
							+ file.getAccessMode());
				}
			}
		}finally{
			
			writer.exdent();
		}
	
protected boolean[]getBytesInCache(TOTorrent torrent, long[] absoluteOffsets, long[] lengths)

    	//sanity checks
    	if(absoluteOffsets.length != lengths.length)
    		throw new IllegalArgumentException("Offsets/Lengths mismatch");
    	long prevEnding = 0;
    	for(int i = 0;i<lengths.length;i++)
    	{
    		if(absoluteOffsets[i]<prevEnding || lengths[i] <= 0 )
    			throw new IllegalArgumentException("Offsets/Lengths are not in ascending order");
    		prevEnding = absoluteOffsets[i]+lengths[i];
    	}
    	
    	
		Map	map = torrent_to_cache_file_map;
		
		TOTorrentFile[]	files = torrent.getFiles();
	
		boolean[] results = new boolean[absoluteOffsets.length];
		Arrays.fill(results, true); // assume everything to be cached, then check for the opposite 
		
		final long first = absoluteOffsets[0];
		final long last = absoluteOffsets[absoluteOffsets.length-1]+lengths[lengths.length-1];
		boolean foundFile = false;
		long fileOffset = 0;
		for (int i=0;i<files.length;i++){
			TOTorrentFile	tf = files[i];
			
			CacheFileWithCache	cache_file = (CacheFileWithCache)map.get( tf );
			
			long length = tf.getLength();
			
			
			if(fileOffset > last || fileOffset+length < first)
			{
				fileOffset += length;
				continue; // no chunk falls within that file, skip it
			}
				
			
			foundFile = true;
			
			if(cache_file != null)
				cache_file.getBytesInCache(results,absoluteOffsets,lengths);
			else // we have no cache file and thus no cache entries
				for(int j=0;j<results.length;j++) // check if any chunks fall into this non-file
					if((absoluteOffsets[j] < fileOffset+length && absoluteOffsets[j] > fileOffset) || (absoluteOffsets[j]+lengths[j] < fileOffset+length &&  absoluteOffsets[j]+lengths[j] > fileOffset))
						results[j] = false; // no file -> no cache entry
			
			fileOffset += length;
		}
		
		if(!foundFile)
			Arrays.fill(results, false);
		

    	return results;
    
protected longgetBytesReadFromCache()

		return( cache_bytes_read );
	
protected longgetBytesReadFromFile()

		return( file_bytes_read );
	
protected longgetBytesWrittenToCache()

		return( cache_bytes_written );
	
protected longgetBytesWrittenToFile()

		return( file_bytes_written );
	
public longgetCacheReadCount()

		return( cache_read_count );
	
protected longgetCacheSize()

		return( cache_size );
	
protected longgetCacheUsed()

		long free = cache_space_free;
		
		if ( free < 0 ){
			
			free	= 0;
		}
		
		return( cache_size - free );
	
public longgetCacheWriteCount()

		return( cache_write_count );
	
public longgetFileReadCount()

		return( file_read_count );
	
public longgetFileWriteCount()

		return( file_write_count );
	
public CacheFileManagerStatsgetStats()

		return( stats );
	
protected voidinitialise(boolean enabled, boolean enable_read, boolean enable_write, long size, long not_smaller_than)

		cache_enabled			= enabled && ( enable_read || enable_write );
		
		cache_read_enabled		= enabled && enable_read;
		
		cache_write_enabled		= enabled && enable_write;
		
		cache_size				= size;
		
		cache_files_not_smaller_than	= not_smaller_than;
		
		cache_minimum_free_size	= cache_size/4;
		
		cache_space_free		= cache_size;
		
		stats = new CacheFileManagerStatsImpl( this );
		

		cacheStatsAndCleaner();
		

		if (Logger.isEnabled())
			Logger.log(new LogEvent(LOGID, "DiskCache: enabled = " + cache_enabled
					+ ", read = " + cache_read_enabled + ", write = "
					+ cache_write_enabled + ", size = " + cache_size + " B"));
	
protected booleanisCacheEnabled()

		return( cache_enabled );
	
protected booleanisReadCacheEnabled()

		return( cache_read_enabled );
	
protected booleanisWriteCacheEnabled()

		return( cache_write_enabled );
	
protected voidreleaseCacheSpace(CacheEntry entry)

		entry.getBuffer().returnToPool();
		
		try{
			this_mon.enter();
			
			cache_space_free	+= entry.getLength();
			
			if ( cache_entries.remove( entry ) == null ){
				
				Debug.out( "Cache inconsistency: entry missing on removal" );

				throw( new CacheFileManagerException( null, "Cache inconsistency: entry missing on removal"));
			}

			/*
			if ( 	entry.getType() == CacheEntry.CT_READ_AHEAD ){
				
				if ( entry.getUsageCount() < 2 ){
				
					System.out.println( "ra: not used" );
				
				}else{
				
					System.out.println( "ra: used" );
				}
			}
			*/
			
			// System.out.println( "Total cache space = " + cache_space_free );
		}finally{
			
			this_mon.exit();
		}
	
protected voidrethrow(CacheFile file, FMFileManagerException e)

		Throwable 	cause = e.getCause();
		
		if ( cause != null ){
			
			throw( new CacheFileManagerException( file, e.getMessage(), cause ));
		}
		
		throw( new CacheFileManagerException( file, e.getMessage(), e ));
	
public voidsetFileLinks(TOTorrent torrent, com.aelitis.azureus.core.util.CaseSensitiveFileMap links)

		file_manager.setFileLinks( torrent, links );