From 213d48c0873c38afbb63abdd52236405577d8fe0 Mon Sep 17 00:00:00 2001 From: Aidan Hahn Date: Thu, 23 May 2019 22:13:57 -0700 Subject: [PATCH] lock individual processes, not the whole queue --- src/main/java/JobServ/ProcessManager.java | 168 ++++++++---------- src/test/java/JobServ/ProcessManagerTest.java | 41 +++-- .../ProcessManagerTestImplementation.java | 11 +- 3 files changed, 110 insertions(+), 110 deletions(-) diff --git a/src/main/java/JobServ/ProcessManager.java b/src/main/java/JobServ/ProcessManager.java index 5bae6f0..dd955a1 100644 --- a/src/main/java/JobServ/ProcessManager.java +++ b/src/main/java/JobServ/ProcessManager.java @@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.HashMap; import java.util.Iterator; @@ -26,28 +27,29 @@ import java.io.IOException; */ class ProcessManager { // TODO: LOCK_TIMEOUT should be defined in a configuration management system - private final int LOCK_TIMEOUT = 5; // seconds - private HashMap processMap; - private Boolean processQueueMutex = false; + private final int LOCK_TIMEOUT = 2; // seconds + + /* + * The significance of the concurrent hash map is that an in process + * update will not leave it in an unusable state like it will a normal + * HashMap. It is still up to the programmer in this instance to make + * sure that there are no concurrent operations done to the ProcessControllers + * Themselves. The last thing we want is to throw NPEs or whatnot when + * accessing a process destroyed mid read by another thread. + * Hence getLock(...) and lockMap controlling access to individual entries in + * processMap + */ + protected ConcurrentHashMap processMap; + protected ConcurrentHashMap lockMap; private ExecutorService threadPool = Executors.newCachedThreadPool(); - private Callable lockCallable = new Callable() { - public Object call() { - while(processQueueMutex){ - continue; // spin! - } - - processQueueMutex = true; - return 1; - } - }; - /* * Constructor * initializes process queue and start the background process checking daemon */ public ProcessManager() { - processMap = new HashMap(); + processMap = new ConcurrentHashMap(); + lockMap = new ConcurrentHashMap(); /* TODO: In a long running server over a large period of time * It is possible that the streams used to redirect IO in the * Processes may become a significant use of resources. @@ -61,39 +63,21 @@ class ProcessManager { /* * newProcess() * Takes a command and returns the translated pid of a new process - * Returns -1 if getLock fails + * Returns -1 if getLock fails TODO: REMOVE * Returns -2 if controller throws an IOException */ public int newProcess(String command) { - /* - * TRADEOFF: Could initialize new ProcessController out here - * Pro: would minimize time spent in critical section - * Con: what if initialization goes through but we dont get the lock - * we would essentially have a dangling untrackable process - * which likely changed system state before it was killed. - */ try { - // Enter critical section - this.getLock(); - ProcessController newProc = new ProcessController(command); + this.lockMap.put(newProc.getPid(), true); this.processMap.put(newProc.getPid(), newProc); - // Exit critical section - this.releaseLock(); - + this.releaseLock(newProc.getPid()); return newProc.getPid(); - } catch (TimeoutException e) { - // (lock was not grabbed) - System.err.println("Timeout starting new job: " + e.getMessage()); - return -1; - } catch (IOException e) { - // (lock was grabbed) - this.releaseLock(); - System.err.println("ProcessController couldnt start process: " + e.getMessage()); + System.err.println("Couldnt Spawn New Command: " + e.getMessage()); return -2; } } @@ -108,11 +92,10 @@ class ProcessManager { * 4: couldnt grab lock */ public int getProcessStatus(int pid) { - int status; - - // Enter critical section try { - this.getLock(); + if(!this.getLock(pid)) { + return 3; + } } catch (TimeoutException e) { // lock could not be grabbed before timeout @@ -121,15 +104,9 @@ class ProcessManager { } ProcessController candidate = this.processMap.get(pid); - if (candidate != null) { - status = candidate.getStatus(); - this.releaseLock(); - return status; - } - - // process must not exist - this.releaseLock(); - return 3; + int status = candidate.getStatus(); + this.releaseLock(pid); + return status; } /* @@ -142,11 +119,10 @@ class ProcessManager { * 259: couldnt grab lock in time */ public int getProcessReturn(int pid) { - int ret; - - // Enter Critical section try { - this.getLock(); + if(!this.getLock(pid)) { + return 258; + } } catch (TimeoutException e) { System.err.println("Timeout getting process return: " + e.getMessage()); @@ -154,14 +130,9 @@ class ProcessManager { } ProcessController candidate = this.processMap.get(pid); - if (candidate != null) { - ret = candidate.getReturn(); - this.releaseLock(); - return ret; - } - - this.releaseLock(); - return 258; + int ret = candidate.getReturn(); + this.releaseLock(pid); + return ret; } /* @@ -170,10 +141,10 @@ class ProcessManager { * or returns description of error */ public String getProcessOutput(int pid) { - String output; - try { - this.getLock(); + if(!this.getLock(pid)) { + return "[-] SERVER: Process not found"; + } } catch (TimeoutException e) { System.err.println("Timeout getting process output: " + e.getMessage()); @@ -181,14 +152,9 @@ class ProcessManager { } ProcessController candidate = this.processMap.get(pid); - if (candidate != null) { - output = candidate.getOutput(); - this.releaseLock(); - return output; - } - - this.releaseLock(); - return "[-] ERROR: Process not found"; + String output = candidate.getOutput(); + this.releaseLock(pid); + return output; } /* @@ -200,9 +166,10 @@ class ProcessManager { * returns 3 if couldnt grab lock */ public int killProcess(int pid) { - int code; try { - this.getLock(); + if(!this.getLock(pid)) { + return 2; + } } catch (TimeoutException e) { System.err.println("Timeout killing process: " + e.getMessage()); @@ -210,16 +177,9 @@ class ProcessManager { } ProcessController candidate = this.processMap.get(pid); - if (candidate != null) { - candidate.kill(); - code = 1; - - } else { - code = 2; - } - - this.releaseLock(); - return code; + candidate.kill(); + this.releaseLock(pid); + return 1; } /* @@ -228,25 +188,41 @@ class ProcessManager { * Waits for a predefined timeout period for mutex to be avail. * Synchronized so two things cannot grab lock at once. * Throws TimeoutException when it fails to get the lock. + * Alternatively, throws false if lock doesnt exist for PID + * Function is synchronized to prevent multiple threads accessing the same lock at once + * (ConcurrentHashMap will report whatever lock value was last to successfully update) */ - protected synchronized void getLock() throws TimeoutException { - Future future = this.threadPool.submit(this.lockCallable); + protected synchronized Boolean getLock(int pid) throws TimeoutException { + if (!lockMap.containsKey(pid)) { + return false; + } + + Future future = this.threadPool.submit( + new Callable() { + public Object call() { + while(lockMap.get(pid)) { + continue; // spin! + } + + lockMap.replace(pid, true); + return 1; + } + }); try { future.get(this.LOCK_TIMEOUT, TimeUnit.SECONDS); } catch (InterruptedException e) { System.err.println("[!] ERROR: " + e.getMessage()); - throw new TimeoutException(); - // rethrowing a timeout exception tells the calling process that they dont have the lock + future.cancel(true); + return false; } catch (ExecutionException e) { System.err.println("[!] ERROR: " + e.getMessage()); - throw new TimeoutException(); + future.cancel(true); + return false; // cancel the attempt to grab the lock - } finally { - future.cancel(true); } /* @@ -261,14 +237,16 @@ class ProcessManager { * mediate access to the ProcessManager * object for fresh calls as well. */ + + return true; } /* * releaseLock() * releases mutex so other threads can operate on processqueue */ - protected void releaseLock() { - this.processQueueMutex = false; + protected void releaseLock(int pid) { + this.lockMap.put(pid, false); } /* @@ -278,8 +256,6 @@ class ProcessManager { * releases resources held in the processController objects */ public void shutdown() { - this.processQueueMutex = true; - Iterator> iterator = this.processMap.entrySet().iterator(); while (iterator.hasNext()) { HashMap.Entry entry = iterator.next(); diff --git a/src/test/java/JobServ/ProcessManagerTest.java b/src/test/java/JobServ/ProcessManagerTest.java index 89b835a..d4104f1 100644 --- a/src/test/java/JobServ/ProcessManagerTest.java +++ b/src/test/java/JobServ/ProcessManagerTest.java @@ -31,10 +31,12 @@ public class ProcessManagerTest { private ProcessManagerTestImplementation manager = new ProcessManagerTestImplementation(); private ExecutorService threadPool = Executors.newCachedThreadPool(); + private int asyncTestPid; + // calls a test function that simulates load by holding the lock for a long time - private Callable holdLockSevenSeconds = new Callable() { + private Callable holdLockFourSeconds = new Callable() { public Object call() { - manager.longCallHoldsLock(); + manager.longCallHoldsLock(asyncTestPid); return true; } }; @@ -158,7 +160,7 @@ public class ProcessManagerTest { @Test public void getUnknownOutputTest() { String out = manager.getProcessOutput(532); - assertEquals("[-] ERROR: Process not found", out); + assertEquals("[-] SERVER: Process not found", out); manager.shutdown(); } @@ -187,22 +189,41 @@ public class ProcessManagerTest { */ @Test public void asyncLockTimeoutTest() { - Future future = this.threadPool.submit(this.holdLockSevenSeconds); + // start new process that will last the whole test + asyncTestPid = this.manager.newProcess("sleep 7"); + int secondProcess = this.manager.newProcess("sleep 10"); + + // grab that processes lock for 4 seconds + Future future = this.threadPool.submit(this.holdLockFourSeconds); try { - Thread.sleep(50); + Thread.sleep(100); } catch (InterruptedException e) { System.err.println("[!!] Thread for async test interrupted!"); } + // Try to grab a held lock System.err.println("[2] attempting to grab (held) lock"); - int pid = this.manager.newProcess("Test Process"); - assertEquals(-1, pid); + int status = this.manager.getProcessStatus(this.asyncTestPid); + assertEquals(4, status); // should time out after 2 secs - future.cancel(true); + // cancel the blocking thread, release lock + //future.cancel(true); - int pid2 = this.manager.newProcess("echo test"); - assertNotEquals(-1, pid2); + // try to grab unrelated lock (not nessesary, but important it works) + int statusTertiary = this.manager.getProcessStatus(secondProcess); + assertNotEquals(4, statusTertiary); + + // give lockMap small time to update + try { + Thread.sleep(100); + } catch (InterruptedException e) { + System.err.println("[!!] Thread for async test interrupted!"); + } + + // should be grabbable now + int statusSecondTry = this.manager.getProcessStatus(this.asyncTestPid); + assertNotEquals(4, statusSecondTry); manager.shutdown(); } diff --git a/src/test/java/JobServ/ProcessManagerTestImplementation.java b/src/test/java/JobServ/ProcessManagerTestImplementation.java index e9b44a7..9e879cb 100644 --- a/src/test/java/JobServ/ProcessManagerTestImplementation.java +++ b/src/test/java/JobServ/ProcessManagerTestImplementation.java @@ -17,25 +17,28 @@ import java.util.concurrent.TimeoutException; */ class ProcessManagerTestImplementation extends ProcessManager { - public void longCallHoldsLock() { + public void longCallHoldsLock(int pid) { try { - super.getLock(); + super.getLock(pid); System.err.println("[1] Long Call Has Lock"); // hold lock for 7 seconds, more than double normal timeout. Thread.sleep(4000); - super.releaseLock(); + super.releaseLock(pid); } catch (TimeoutException e) { System.err.println("[!!] Long Call wasnt able to grab lock!"); return; } catch (InterruptedException e) { - super.releaseLock(); + super.releaseLock(pid); // this doesnt happen, dont cancel this task System.err.println("[3] Released lock: interrupted"); return; } } + public Boolean reportLockState(int pid) { + return super.lockMap.get(pid); + } }