FileDocCategorySizeDatePackage
GDataIndexer.javaAPI DocApache Lucene 2.1.017205Wed Feb 14 10:46:06 GMT 2007org.apache.lucene.gdata.search.index

GDataIndexer

public class GDataIndexer extends Object
A GDataIndexer encapsulates every writing access to the search index.

Insert, updates and deletes to the index happens inside this class. All modification will be base on an instance of {@link org.apache.lucene.gdata.search.index.IndexDocument} which contains all informations and command for the indexer.
Although this class provides methods to add, remove and update document in the index all IndexDocument instances should be added to the task queue via the {@link GDataIndexer#addIndexableDocumentTask(Future)} method. Inside this class runs an instance of {@link org.apache.lucene.gdata.search.index.IndexTask} listening on this queue. The analysis of the actual documents happens inside the {@link com.sun.corba.se.impl.orbutil.closure.Future} object added to the queue. This enables the indexer to do his actual work. Documents will be build / analyzed concurrently while already finished tasks can be added to the index.

author
Simon Willnauer

Fields Summary
private static final Log
LOG
protected IndexWriter
writer
protected IndexSearcher
searcher
protected AtomicInteger
committed
protected AtomicInteger
optimized
private AtomicBoolean
isDestroyed
protected AtomicInteger
docsAdded
protected AtomicInteger
docsUpdated
protected AtomicInteger
docsDeleted
private final Directory
dir
private final List
listeners
protected final BlockingQueue
futurQueue
private final org.apache.lucene.gdata.search.config.IndexSchema
serviceConfiguration
private final ExecutorService
indexTaskExecutor
protected IndexTask
indexTask
private static final Integer
ZERO
private static final Integer
ONE
private final Map
action
int[]
documentNumber
Constructors Summary
protected GDataIndexer(org.apache.lucene.gdata.search.config.IndexSchema schema, Directory dir, boolean create)


         
                
        if (schema == null)
            throw new IllegalArgumentException(
                    "IndexServiceConfiguration must not be null");
        if (dir == null)
            throw new IllegalArgumentException(
                    "IndexDirectory must not be null");

        this.serviceConfiguration = schema;
        this.dir = dir;
        openWriter(create);
        this.indexTaskExecutor = Executors.newSingleThreadExecutor();
        this.action = new HashMap<IndexDocument, Integer>(128);

    
Methods Summary
protected synchronized voidaddDocument(IndexDocument indexable)

        if (!indexable.isInsert())
            throw new GdataIndexerException(
                    "Index action must be set to insert");
        setAction(indexable);
        doWrite(indexable);
        this.docsAdded.incrementAndGet();

    
public voidaddIndexableDocumentTask(java.util.concurrent.Future task)
Adds the given future task to the queue, and waits if the queue is full. The queue size is set to 100 by default.

param
task - the task to be scheduled
throws
InterruptedException - if the queue is interrupted

        if (this.isDestroyed.get())
            throw new IllegalStateException(
                    "Indexer has already been destroyed");
        this.futurQueue.put(task);
    
protected voidcloseSearcher()

        try {
            if (this.searcher != null)
                this.searcher.close();
        } finally {
            this.searcher = null;
        }
    
protected voidcloseWriter()

        try {
            if (this.writer != null)
                this.writer.close();
        } finally {
            this.writer = null;
        }
    
protected synchronized voidcommit(boolean optimize)
This method commits all changes to the index and closes all open resources (e.g. IndexWriter and IndexReader). This method notifies all registered Commit listeners if invoked.

param
optimize - true if the index should be optimized on this commit
throws
IOException - if an IOException occurs

        if (LOG.isInfoEnabled())
            LOG.info("Commit called with optimize = " + optimize);

        int changes = this.docsAdded.intValue() + this.docsDeleted.intValue()
                + this.docsUpdated.intValue();
        /*
         * don't call listeners to prevent unnecessary close / open of searchers
         */
        if (changes == 0)
            return;
        this.committed.incrementAndGet();
        if(optimize)
            this.optimized.incrementAndGet();
        doDeltete();
        if (optimize) {
            closeSearcher();
            openWriter();
            this.writer.optimize();
        }
        closeSearcher();
        closeWriter();
        this.docsAdded.set(0);
        this.docsDeleted.set(0);
        this.docsUpdated.set(0);
        notifyCommitListeners(this.serviceConfiguration.getName());

    
public static synchronized org.apache.lucene.gdata.search.index.GDataIndexercreateGdataIndexer(org.apache.lucene.gdata.search.config.IndexSchema config, org.apache.lucene.store.Directory dir, boolean create)
This factory method creates a new GDataIndexer using a instance of {@link IndexTask}

param
config - the config to be used to configure the indexer
param
dir - the directory to index to
param
create - true to create a new index, false to use the existing one.
return
- a new GDataIndexer instance
throws
IOException - if an IOException occurs while initializing the indexer

        GDataIndexer retVal = new GDataIndexer(config, dir, create);
        retVal.setIndexTask(new IndexTask(retVal, retVal.futurQueue));
        retVal.init();
        return retVal;
    
public static synchronized org.apache.lucene.gdata.search.index.GDataIndexercreateTimedGdataIndexer(org.apache.lucene.gdata.search.config.IndexSchema config, org.apache.lucene.store.Directory dir, boolean create, long commitTimeout)
This factory method creates a new GDataIndexer using a instance of {@link TimedIndexTask}. This indexer will automatically commit the index if no modification to the index occur for the given time. The used time unit is {@link TimeUnit#SECONDS}. Values less than the default value will be ignored. For the default value see {@link TimedIndexTask}.

param
config - the config to be used to configure the indexer
param
dir - the directory to index to
param
create - true to create a new index, false to use the existing one.
param
commitTimeout - the amount of seconds to wait until a commit should be scheduled
return
- a new GDataIndexer instance
throws
IOException - if an IOException occurs while initializing the indexer


        GDataIndexer retVal = new GDataIndexer(config, dir, create);
        retVal.setIndexTask(new TimedIndexTask(retVal, retVal.futurQueue,
                commitTimeout));
        retVal.init();
        return retVal;
    
protected synchronized voiddeleteDocument(IndexDocument indexable)

        if (!indexable.isDelete())
            throw new GdataIndexerException(
                    "Index action must be set to delete");

        setAction(indexable);
        this.docsDeleted.incrementAndGet();
    
protected synchronized voiddestroy()

        this.isDestroyed.set(true);
        if (!this.indexTask.isStopped())
            this.indexTask.stop();
        this.futurQueue.add(new FinishingFuture());
        this.indexTaskExecutor.shutdown();
        closeWriter();
        closeSearcher();
        if (LOG.isInfoEnabled())
            LOG.info("Destroying GdataIndexer for service -- "
                    + this.serviceConfiguration.getName());

    
protected voiddoDeltete()

        if (this.action.size() == 0)
            return;
        if (LOG.isInfoEnabled())
            LOG
                    .info("Deleting documents and duplicates from index, size of IndexDocuments "
                            + this.action.size());
        closeWriter();
        openSearcher();

        IndexReader reader = this.searcher.getIndexReader();
        TermDocs termDocs = reader.termDocs();
        for (Map.Entry<IndexDocument, Integer> entry : this.action.entrySet()) {
            IndexDocument indexDocument = entry.getKey();
            Integer docToKeep = entry.getValue();
            // extend the array if needed
            if (this.documentNumber == null
                    || docToKeep > this.documentNumber.length)
                this.documentNumber = new int[docToKeep];

            for (int i = 0; i < this.documentNumber.length; i++) {

                this.documentNumber[i] = -1;
            }
            /*
             * get the term to find the document from the document itself
             */
            termDocs.seek(indexDocument.getDeletealbe());

            int pos = 0;

            while (termDocs.next()) {
                /*
                 * if this is a pure delete just delete it an continue
                 */
                if (docToKeep == 0) {
                    reader.deleteDocument(termDocs.doc());
                    continue;
                }

                int prev = this.documentNumber[pos];
                this.documentNumber[pos] = termDocs.doc();
                if (prev != -1) {
                    reader.deleteDocument(prev);
                }

                if (++pos >= docToKeep)
                    pos = 0;

            }
        }
        /*
         * clear the map after all documents are processed
         */
        this.action.clear();
        closeSearcher();
    
protected voiddoWrite(IndexDocument document)

        closeSearcher();
        openWriter();
        this.writer.addDocument(document.getWriteable());

    
protected voidinit()

        if (this.indexTask == null)
            this.indexTask = new IndexTask(this, this.futurQueue);
        this.indexTaskExecutor.execute(this.indexTask);

    
protected voidnotifyCommitListeners(java.lang.String serviceId)

        if (LOG.isInfoEnabled())
            LOG.info("notify commit event listeners for service id: "
                    + serviceId + " --  current size of registered listeners: "
                    + this.listeners.size());
        for (IndexEventListener listener : this.listeners) {
            listener.commitCallBack(serviceId);
        }
    
protected voidopenSearcher()

        if (this.searcher == null)
            this.searcher = new IndexSearcher(this.dir);
    
protected voidopenWriter()

        openWriter(false);
    
private voidopenWriter(boolean create)

        if (this.writer == null)
            this.writer = new GDataIndexWriter(this.dir, create,
                    this.serviceConfiguration);
    
public voidregisterIndexEventListener(IndexEventListener listener)
Registers a new IndexEventListener. All registered listeners will be notified if the index has been committed.

param
listener - the listener to register

        if (listener == null || this.listeners.contains(listener))
            return;
        this.listeners.add(listener);
    
public voidremoveIndexEventListener(IndexEventListener listener)
Removes a registered IndexEventListener

param
listener - the listener to remove


        if (listener == null || !this.listeners.contains(listener))
            return;
        this.listeners.remove(listener);
    
private voidsetAction(IndexDocument doc)

        Integer docCountToKeep = this.action.get(doc);
        if (!doc.isDelete() && (docCountToKeep == null || docCountToKeep == 0)) {
            /*
             * add a ONE for ONE documents to keep for this IndexDocument when
             * doDelete. doDelete will keep the latest added document and
             * deletes all other documents for this IndexDocument e.g. all
             * duplicates
             */
            this.action.put(doc, ONE);
        } else if (doc.isDelete()
                && (docCountToKeep == null || docCountToKeep > 0)) {
            /*
             * add a zero for zero documents to keep for this IndexDocument when
             * doDelete
             */
            this.action.put(doc, ZERO);
        }
    
protected voidsetIndexTask(IndexTask task)

        if (task != null && this.indexTask == null)
            this.indexTask = task;
    
protected synchronized voidupdateDocument(IndexDocument indexable)

        if (!indexable.isUpdate())
            throw new GdataIndexerException(
                    "Index action must be set to update");
        setAction(indexable);
        doWrite(indexable);
        this.docsUpdated.incrementAndGet();