while (!this.stopped.get() || this.taskQueue.size() != 0) {
try {
/*
* get the future from the queue and wait until processing has
* been done
*/
Future<IndexDocument> future = getTask();
if (future != null) {
IndexDocument document = future.get();
setOptimize(document);
processDocument(document);
/*
* the document contains the info for commit or optimize -->
* this comes from the controller
*/
if (document == null || document.commitAfter())
this.indexer.commit(document == null ? false : this.optimize.getAndSet(false));
}
if (this.commit.getAndSet(false))
this.indexer.commit(this.optimize.getAndSet(false));
} catch (InterruptedException e) {
INNERLOG.warn("Queue is interrupted exiting IndexTask -- ", e);
} catch (GdataIndexerException e) {
/*
*
* TODO fire callback here as well
*/
INNERLOG.error("can not retrieve Field from IndexDocument ", e);
} catch (ExecutionException e) {
/*
* TODO callback for fail this exception is caused by an
* exception while processing the document. call back for failed
* docs should be placed here
*/
INNERLOG.error("Future throws execution exception ", e);
} catch (IOException e) {
INNERLOG.error("IOException thrown while processing document ",
e);
} catch (Throwable e) {
/*
* catch all to prevent the thread from dieing
*/
INNERLOG.error(
"Unexpected exception while processing document -- "
+ e.getMessage(), e);
}
}
try {
this.indexer.commit(this.optimize.getAndSet(false));
} catch (IOException e) {
INNERLOG.warn("commit on going down failed - "+e.getMessage(),e);
}
this.stop();