FileDocCategorySizeDatePackage
PlatformMessenger.javaAPI DocAzureus 3.0.3.415048Fri Jun 08 16:49:10 BST 2007com.aelitis.azureus.core.messenger

PlatformMessenger

public class PlatformMessenger extends Object
author
TuxPaper
created
Sep 25, 2006

Fields Summary
private static boolean
USE_HTTP_POST
public static String
REPLY_EXCEPTION
public static String
REPLY_ACTION
public static String
REPLY_RESULT
private static Map
mapQueue
private static AEMonitor
queue_mon
private static org.gudy.azureus2.core3.util.Timer
timerProcess
private static TimerEvent
timerEvent
private static boolean
initialized
private static fakeContext
context
Constructors Summary
Methods Summary
public static voiddebug(java.lang.String string)

param
string

		AEDiagnosticsLogger diag_logger = AEDiagnostics.getLogger("v3.PMsgr");
		diag_logger.log(string);
		if (Constants.DIAG_TO_STDOUT) {
			System.out.println(Thread.currentThread().getName() + "|"
					+ System.currentTimeMillis() + "] " + string);
		}
	
private static byte[]downloadURL(org.gudy.azureus2.plugins.PluginInterface pi, java.net.URL url, java.lang.String postData)

		ResourceDownloaderFactory rdf = pi.getUtilities().getResourceDownloaderFactory();

		ResourceDownloader rd = rdf.create(url, postData);

		rd = rdf.getRetryDownloader(rd, 3);
		// We could report percentage to listeners, but there's no need to atm
		//		rd.addListener(new ResourceDownloaderListener() {
		//		
		//			public void reportPercentComplete(ResourceDownloader downloader,
		//					int percentage) {
		//			}
		//		
		//			public void reportActivity(ResourceDownloader downloader, String activity) {
		//			}
		//		
		//			public void failed(ResourceDownloader downloader,
		//					ResourceDownloaderException e) {
		//			}
		//		
		//			public boolean completed(ResourceDownloader downloader, InputStream data) {
		//				return true;
		//			}
		//		});

		InputStream is = rd.download();

		byte data[];
		
		try{
			int length = is.available();
	
			data = new byte[length];
	
			is.read(data);

		}finally{
			
			is.close();
		}
		
		return (data);
	
public static ClientMessageContextgetClientMessageContext()

		if (!initialized) {
			init();
		}
		return context;
	
public static synchronized voidinit()


	     
		if (initialized) {
			return;
		}
		initialized = true;

		context = new fakeContext();
		context.addMessageListener(new TorrentListener());
		context.addMessageListener(new DisplayListener(null));
		context.addMessageListener(new ConfigListener(null));
	
protected static voidprocessQueue()

		if (!initialized) {
			init();
		}

		final Map mapProcessing = new HashMap();

		queue_mon.enter();
		try {
			mapProcessing.putAll(mapQueue);
			mapQueue.clear();
		} finally {
			queue_mon.exit();
		}
		debug("about to process " + mapProcessing.size());
		
		if (mapProcessing.size() == 0) {
			return;
		}

		String urlStem = "";
		long sequenceNo = 0;
		for (Iterator iter = mapProcessing.keySet().iterator(); iter.hasNext();) {
			PlatformMessage message = (PlatformMessage) iter.next();
			message.setSequenceNo(sequenceNo);

			if (sequenceNo > 0) {
				urlStem += "&";
			}
			try {
				urlStem += "cmd="
						+ URLEncoder.encode(BrowserMessage.MESSAGE_PREFIX
								+ BrowserMessage.MESSAGE_DELIM + sequenceNo
								+ BrowserMessage.MESSAGE_DELIM + message.getListenerID()
								+ BrowserMessage.MESSAGE_DELIM + message.getOperationID()
								+ BrowserMessage.MESSAGE_DELIM
								+ message.getParameters().toString(), "UTF-8");
			} catch (UnsupportedEncodingException e) {
			}

			PlatformMessengerListener listener = (PlatformMessengerListener) mapProcessing.get(message);
			if (listener != null) {
				listener.messageSent(message);
			}
			sequenceNo++;
		}

		String sURL;
		String sPostData = null;
		if (USE_HTTP_POST) {
			sURL = Constants.URL_PREFIX + Constants.URL_POST_PLATFORM_MESSAGE;
			sPostData = Constants.URL_POST_PLATFORM_DATA + "&" + urlStem + "&"
					+ Constants.URL_SUFFIX + "\n";
			debug("POST: " + sURL + "?" + sPostData);
		} else {
			sURL = Constants.URL_PREFIX + Constants.URL_PLATFORM_MESSAGE + "&"
					+ urlStem + "&" + Constants.URL_SUFFIX;
			debug("GET: " + sURL);
		}

		final String fURL = sURL;
		final String fPostData = sPostData;

		AEThread thread = new AEThread("v3.PlatformMessenger", true) {
			public void runSupport() {
				try {
					processQueueAsync(fURL, fPostData, mapProcessing);
				} catch (Exception e) {
					Debug.out("Error while sending message(s) to Platform", e);
					for (Iterator iter = mapProcessing.keySet().iterator(); iter.hasNext();) {
						PlatformMessage message = (PlatformMessage) iter.next();
						PlatformMessengerListener l = (PlatformMessengerListener) mapProcessing.get(message);
						if (l != null) {
							try {
								HashMap map = new HashMap();
								map.put("text", e.toString());
								map.put("Throwable", e);
								l.replyReceived(message, REPLY_EXCEPTION, map);
							} catch (Exception e2) {
								Debug.out("Error while sending replyReceived", e2);
							}
						}
					}
				}
			}
		};
		thread.start();
	
protected static voidprocessQueueAsync(java.lang.String sURL, java.lang.String sData, java.util.Map mapProcessing)

param
mapProcessing
param
surl
throws
Exception

		URL url;
		url = new URL(sURL);
		AzureusCore core = AzureusCoreFactory.getSingleton();
		final PluginInterface pi = core.getPluginManager().getDefaultPluginInterface();

		byte[] bytes = downloadURL(pi, url, sData);
		String s = new String(bytes, "UTF8");

		// Format: <sequence no> ; <classification> [; <results>] [ \n ]

		if (s == null || s.length() == 0 || !Character.isDigit(s.charAt(0))) {
			Debug.out("Error while sending message(s) to Platform: reply: " + s
					+ "\nurl: " + sURL + "\nPostData: " + sData);
			for (Iterator iter = mapProcessing.keySet().iterator(); iter.hasNext();) {
				PlatformMessage message = (PlatformMessage) iter.next();
				PlatformMessengerListener l = (PlatformMessengerListener) mapProcessing.get(message);
				if (l != null) {
					try {
						HashMap map = new HashMap();
						map.put("text", "result was " + s);
						l.replyReceived(message, REPLY_EXCEPTION, map);
					} catch (Exception e2) {
						Debug.out("Error while sending replyReceived" + "\nurl: " + sURL
								+ "\nPostData: " + sData, e2);
					}
				}
			}
			return;
		}

		Map mapSeqToBrowserMsg = new HashMap();

		String[] replies = s.split("\\n");
		for (int i = 0; i < replies.length; i++) {
			String reply = replies[i];

			final String[] replySections = reply.split(BrowserMessage.MESSAGE_DELIM,
					3);
			if (replySections.length < 2) {
				continue;
			}
			long sequenceNo = NumberFormat.getInstance().parse(replySections[0]).longValue();

			Map actionResults = null;

			if (replySections.length == 3) {
				try {
					actionResults = JSONUtils.decodeJSON(replySections[2]);
				} catch (Throwable e) {
					Debug.out("Error while sending message(s) to Platform: reply: " + s
							+ "\nurl: " + sURL + "\nPostData: " + sData, e);
				}
			}

			// Find PlatformMessage associated with sequence
			// TODO: There's a better way to do this
			PlatformMessage message = null;
			PlatformMessengerListener listener = null;
			for (Iterator iter = mapProcessing.keySet().iterator(); iter.hasNext();) {
				PlatformMessage potentialMessage = (PlatformMessage) iter.next();
				if (potentialMessage.getSequenceNo() == sequenceNo) {
					message = potentialMessage;
					listener = (PlatformMessengerListener) mapProcessing.get(message);
				}
			}

			if (message == null) {
				Debug.out("No message with sequence number " + sequenceNo);
				continue;
			}

			debug("Got a reply! " + reply + "\n\t for " + message.toString());

			final PlatformMessage fMessage = message;
			final PlatformMessengerListener fListener = listener;
			final Map fActionResults = actionResults;

			// test
			if (i == 0 && false) {
				replySections[1] = "action";
				actionResults = new JSONObject();
				actionResults.put("retry-client-message", new Boolean(true));
				JSONArray a = new JSONArray();
				a.add("[AZMSG;1;display;open-url;{\"url\":\"http://yahoo.com\",\"width\":500,\"height\":200}]");
				actionResults.put("messages", a);
			}

			// Todo check array [1] for reply type

			if (replySections[1].equals("action")) {
				if (actionResults instanceof Map) {
					final boolean bRetry = MapUtils.getMapBoolean(actionResults, "retry-client-message", false);
					
					List array = (List) MapUtils.getMapObject(actionResults, "messages",
							null, List.class);
					if (actionResults.containsKey("messages")) {
						for (int j = 0; j < array.size(); j++) {
							final String sMsg = (String) array.get(j);
							debug("handling (" + ((bRetry) ? " with retry" : " no retry")
									+ "): " + sMsg);

							final BrowserMessage browserMsg = new BrowserMessage(sMsg);
							int seq = browserMsg.getSequence();
							BrowserMessage existingBrowserMsg = (BrowserMessage) mapSeqToBrowserMsg.get(new Long(
									seq));
							if (existingBrowserMsg != null) {
								existingBrowserMsg.addCompletionListener(new MessageCompletionListener() {
									public void completed(boolean success, Object data) {
										debug("got complete for " + sMsg);
										if (success) {
											queueMessage(fMessage, fListener);
										} else {
											if (fListener != null) {
												try {
													fListener.replyReceived(fMessage, replySections[1],
															fActionResults);
												} catch (Exception e2) {
													Debug.out("Error while sending replyReceived", e2);
												}
											}
										}
									}
								});
								continue;
							}

							if (bRetry) {
								mapSeqToBrowserMsg.put(new Long(seq), browserMsg);

								browserMsg.addCompletionListener(new MessageCompletionListener() {
									public void completed(boolean success, Object data) {
										debug("got complete for " + sMsg + ";" + success);
										if (success) {
											queueMessage(fMessage, fListener);
										} else {
											if (fListener != null) {
												try {
													fListener.replyReceived(fMessage, replySections[1],
															fActionResults);
												} catch (Exception e2) {
													Debug.out("Error while sending replyReceived", e2);
												}
											}
										}
									}
								});
							}

							new AEThread("v3.Msg.Dispatch") {
								public void runSupport() {
									context.getMessageDispatcher().dispatch(browserMsg);
								}
							}.start();
						}
					}
					if (bRetry) {
						continue;
					}
				}
			}

			if (listener != null) {
				try {
					listener.replyReceived(message, replySections[1], actionResults);
				} catch (Exception e2) {
					Debug.out("Error while sending replyReceived", e2);
				}
			}
		}
		context.getMessageDispatcher().resetSequence();
	
public static voidqueueMessage(PlatformMessage message, PlatformMessengerListener listener)

		if (!initialized) {
			init();
		}

		debug("q msg " + message + " for " + new Date(message.getFireBefore()));
		queue_mon.enter();
		try {
			mapQueue.put(message, listener);

			if (timerEvent == null || timerEvent.hasRun()) {
				timerEvent = timerProcess.addEvent(message.getFireBefore(),
						new TimerEventPerformer() {
							public void perform(TimerEvent event) {
								timerEvent = null;
								processQueue();
							}
						});
			} else {
				// Move the time up if we have to
				if (message.getFireBefore() < timerEvent.getWhen()) {
					timerProcess.adjustAllBy(message.getFireBefore()
							- timerEvent.getWhen());
				}
			}
		} finally {
			queue_mon.exit();
		}