lock individual processes, not the whole queue
This commit is contained in:
parent
4cb9d3a5e1
commit
213d48c087
3 changed files with 110 additions and 110 deletions
|
|
@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
@ -26,28 +27,29 @@ import java.io.IOException;
|
||||||
*/
|
*/
|
||||||
class ProcessManager {
|
class ProcessManager {
|
||||||
// TODO: LOCK_TIMEOUT should be defined in a configuration management system
|
// TODO: LOCK_TIMEOUT should be defined in a configuration management system
|
||||||
private final int LOCK_TIMEOUT = 5; // seconds
|
private final int LOCK_TIMEOUT = 2; // seconds
|
||||||
private HashMap<Integer, ProcessController> processMap;
|
|
||||||
private Boolean processQueueMutex = false;
|
/*
|
||||||
|
* The significance of the concurrent hash map is that an in process
|
||||||
|
* update will not leave it in an unusable state like it will a normal
|
||||||
|
* HashMap. It is still up to the programmer in this instance to make
|
||||||
|
* sure that there are no concurrent operations done to the ProcessControllers
|
||||||
|
* Themselves. The last thing we want is to throw NPEs or whatnot when
|
||||||
|
* accessing a process destroyed mid read by another thread.
|
||||||
|
* Hence getLock(...) and lockMap controlling access to individual entries in
|
||||||
|
* processMap
|
||||||
|
*/
|
||||||
|
protected ConcurrentHashMap<Integer, ProcessController> processMap;
|
||||||
|
protected ConcurrentHashMap<Integer, Boolean> lockMap;
|
||||||
private ExecutorService threadPool = Executors.newCachedThreadPool();
|
private ExecutorService threadPool = Executors.newCachedThreadPool();
|
||||||
|
|
||||||
private Callable<Object> lockCallable = new Callable<Object>() {
|
|
||||||
public Object call() {
|
|
||||||
while(processQueueMutex){
|
|
||||||
continue; // spin!
|
|
||||||
}
|
|
||||||
|
|
||||||
processQueueMutex = true;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Constructor
|
* Constructor
|
||||||
* initializes process queue and start the background process checking daemon
|
* initializes process queue and start the background process checking daemon
|
||||||
*/
|
*/
|
||||||
public ProcessManager() {
|
public ProcessManager() {
|
||||||
processMap = new HashMap<Integer, ProcessController>();
|
processMap = new ConcurrentHashMap<Integer, ProcessController>();
|
||||||
|
lockMap = new ConcurrentHashMap<Integer, Boolean>();
|
||||||
/* TODO: In a long running server over a large period of time
|
/* TODO: In a long running server over a large period of time
|
||||||
* It is possible that the streams used to redirect IO in the
|
* It is possible that the streams used to redirect IO in the
|
||||||
* Processes may become a significant use of resources.
|
* Processes may become a significant use of resources.
|
||||||
|
|
@ -61,39 +63,21 @@ class ProcessManager {
|
||||||
/*
|
/*
|
||||||
* newProcess()
|
* newProcess()
|
||||||
* Takes a command and returns the translated pid of a new process
|
* Takes a command and returns the translated pid of a new process
|
||||||
* Returns -1 if getLock fails
|
* Returns -1 if getLock fails TODO: REMOVE
|
||||||
* Returns -2 if controller throws an IOException
|
* Returns -2 if controller throws an IOException
|
||||||
*/
|
*/
|
||||||
public int newProcess(String command) {
|
public int newProcess(String command) {
|
||||||
/*
|
|
||||||
* TRADEOFF: Could initialize new ProcessController out here
|
|
||||||
* Pro: would minimize time spent in critical section
|
|
||||||
* Con: what if initialization goes through but we dont get the lock
|
|
||||||
* we would essentially have a dangling untrackable process
|
|
||||||
* which likely changed system state before it was killed.
|
|
||||||
*/
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Enter critical section
|
|
||||||
this.getLock();
|
|
||||||
|
|
||||||
ProcessController newProc = new ProcessController(command);
|
ProcessController newProc = new ProcessController(command);
|
||||||
|
this.lockMap.put(newProc.getPid(), true);
|
||||||
this.processMap.put(newProc.getPid(), newProc);
|
this.processMap.put(newProc.getPid(), newProc);
|
||||||
|
|
||||||
// Exit critical section
|
this.releaseLock(newProc.getPid());
|
||||||
this.releaseLock();
|
|
||||||
|
|
||||||
return newProc.getPid();
|
return newProc.getPid();
|
||||||
|
|
||||||
} catch (TimeoutException e) {
|
|
||||||
// (lock was not grabbed)
|
|
||||||
System.err.println("Timeout starting new job: " + e.getMessage());
|
|
||||||
return -1;
|
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// (lock was grabbed)
|
System.err.println("Couldnt Spawn New Command: " + e.getMessage());
|
||||||
this.releaseLock();
|
|
||||||
System.err.println("ProcessController couldnt start process: " + e.getMessage());
|
|
||||||
return -2;
|
return -2;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -108,11 +92,10 @@ class ProcessManager {
|
||||||
* 4: couldnt grab lock
|
* 4: couldnt grab lock
|
||||||
*/
|
*/
|
||||||
public int getProcessStatus(int pid) {
|
public int getProcessStatus(int pid) {
|
||||||
int status;
|
|
||||||
|
|
||||||
// Enter critical section
|
|
||||||
try {
|
try {
|
||||||
this.getLock();
|
if(!this.getLock(pid)) {
|
||||||
|
return 3;
|
||||||
|
}
|
||||||
|
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
// lock could not be grabbed before timeout
|
// lock could not be grabbed before timeout
|
||||||
|
|
@ -121,17 +104,11 @@ class ProcessManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
ProcessController candidate = this.processMap.get(pid);
|
ProcessController candidate = this.processMap.get(pid);
|
||||||
if (candidate != null) {
|
int status = candidate.getStatus();
|
||||||
status = candidate.getStatus();
|
this.releaseLock(pid);
|
||||||
this.releaseLock();
|
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
// process must not exist
|
|
||||||
this.releaseLock();
|
|
||||||
return 3;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* getProcessReturn()
|
* getProcessReturn()
|
||||||
* returns:
|
* returns:
|
||||||
|
|
@ -142,11 +119,10 @@ class ProcessManager {
|
||||||
* 259: couldnt grab lock in time
|
* 259: couldnt grab lock in time
|
||||||
*/
|
*/
|
||||||
public int getProcessReturn(int pid) {
|
public int getProcessReturn(int pid) {
|
||||||
int ret;
|
|
||||||
|
|
||||||
// Enter Critical section
|
|
||||||
try {
|
try {
|
||||||
this.getLock();
|
if(!this.getLock(pid)) {
|
||||||
|
return 258;
|
||||||
|
}
|
||||||
|
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
System.err.println("Timeout getting process return: " + e.getMessage());
|
System.err.println("Timeout getting process return: " + e.getMessage());
|
||||||
|
|
@ -154,26 +130,21 @@ class ProcessManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
ProcessController candidate = this.processMap.get(pid);
|
ProcessController candidate = this.processMap.get(pid);
|
||||||
if (candidate != null) {
|
int ret = candidate.getReturn();
|
||||||
ret = candidate.getReturn();
|
this.releaseLock(pid);
|
||||||
this.releaseLock();
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.releaseLock();
|
|
||||||
return 258;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* getProcessOutput()
|
* getProcessOutput()
|
||||||
* returns output of process 'pid'
|
* returns output of process 'pid'
|
||||||
* or returns description of error
|
* or returns description of error
|
||||||
*/
|
*/
|
||||||
public String getProcessOutput(int pid) {
|
public String getProcessOutput(int pid) {
|
||||||
String output;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.getLock();
|
if(!this.getLock(pid)) {
|
||||||
|
return "[-] SERVER: Process not found";
|
||||||
|
}
|
||||||
|
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
System.err.println("Timeout getting process output: " + e.getMessage());
|
System.err.println("Timeout getting process output: " + e.getMessage());
|
||||||
|
|
@ -181,16 +152,11 @@ class ProcessManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
ProcessController candidate = this.processMap.get(pid);
|
ProcessController candidate = this.processMap.get(pid);
|
||||||
if (candidate != null) {
|
String output = candidate.getOutput();
|
||||||
output = candidate.getOutput();
|
this.releaseLock(pid);
|
||||||
this.releaseLock();
|
|
||||||
return output;
|
return output;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.releaseLock();
|
|
||||||
return "[-] ERROR: Process not found";
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* killProcess()
|
* killProcess()
|
||||||
* returns mirror processStatus
|
* returns mirror processStatus
|
||||||
|
|
@ -200,9 +166,10 @@ class ProcessManager {
|
||||||
* returns 3 if couldnt grab lock
|
* returns 3 if couldnt grab lock
|
||||||
*/
|
*/
|
||||||
public int killProcess(int pid) {
|
public int killProcess(int pid) {
|
||||||
int code;
|
|
||||||
try {
|
try {
|
||||||
this.getLock();
|
if(!this.getLock(pid)) {
|
||||||
|
return 2;
|
||||||
|
}
|
||||||
|
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
System.err.println("Timeout killing process: " + e.getMessage());
|
System.err.println("Timeout killing process: " + e.getMessage());
|
||||||
|
|
@ -210,16 +177,9 @@ class ProcessManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
ProcessController candidate = this.processMap.get(pid);
|
ProcessController candidate = this.processMap.get(pid);
|
||||||
if (candidate != null) {
|
|
||||||
candidate.kill();
|
candidate.kill();
|
||||||
code = 1;
|
this.releaseLock(pid);
|
||||||
|
return 1;
|
||||||
} else {
|
|
||||||
code = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.releaseLock();
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -228,25 +188,41 @@ class ProcessManager {
|
||||||
* Waits for a predefined timeout period for mutex to be avail.
|
* Waits for a predefined timeout period for mutex to be avail.
|
||||||
* Synchronized so two things cannot grab lock at once.
|
* Synchronized so two things cannot grab lock at once.
|
||||||
* Throws TimeoutException when it fails to get the lock.
|
* Throws TimeoutException when it fails to get the lock.
|
||||||
|
* Alternatively, throws false if lock doesnt exist for PID
|
||||||
|
* Function is synchronized to prevent multiple threads accessing the same lock at once
|
||||||
|
* (ConcurrentHashMap will report whatever lock value was last to successfully update)
|
||||||
*/
|
*/
|
||||||
protected synchronized void getLock() throws TimeoutException {
|
protected synchronized Boolean getLock(int pid) throws TimeoutException {
|
||||||
Future<Object> future = this.threadPool.submit(this.lockCallable);
|
if (!lockMap.containsKey(pid)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Object> future = this.threadPool.submit(
|
||||||
|
new Callable<Object>() {
|
||||||
|
public Object call() {
|
||||||
|
while(lockMap.get(pid)) {
|
||||||
|
continue; // spin!
|
||||||
|
}
|
||||||
|
|
||||||
|
lockMap.replace(pid, true);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
future.get(this.LOCK_TIMEOUT, TimeUnit.SECONDS);
|
future.get(this.LOCK_TIMEOUT, TimeUnit.SECONDS);
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
System.err.println("[!] ERROR: " + e.getMessage());
|
System.err.println("[!] ERROR: " + e.getMessage());
|
||||||
throw new TimeoutException();
|
future.cancel(true);
|
||||||
// rethrowing a timeout exception tells the calling process that they dont have the lock
|
return false;
|
||||||
|
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
System.err.println("[!] ERROR: " + e.getMessage());
|
System.err.println("[!] ERROR: " + e.getMessage());
|
||||||
throw new TimeoutException();
|
future.cancel(true);
|
||||||
|
return false;
|
||||||
|
|
||||||
// cancel the attempt to grab the lock
|
// cancel the attempt to grab the lock
|
||||||
} finally {
|
|
||||||
future.cancel(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -261,14 +237,16 @@ class ProcessManager {
|
||||||
* mediate access to the ProcessManager
|
* mediate access to the ProcessManager
|
||||||
* object for fresh calls as well.
|
* object for fresh calls as well.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* releaseLock()
|
* releaseLock()
|
||||||
* releases mutex so other threads can operate on processqueue
|
* releases mutex so other threads can operate on processqueue
|
||||||
*/
|
*/
|
||||||
protected void releaseLock() {
|
protected void releaseLock(int pid) {
|
||||||
this.processQueueMutex = false;
|
this.lockMap.put(pid, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -278,8 +256,6 @@ class ProcessManager {
|
||||||
* releases resources held in the processController objects
|
* releases resources held in the processController objects
|
||||||
*/
|
*/
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
this.processQueueMutex = true;
|
|
||||||
|
|
||||||
Iterator<HashMap.Entry<Integer, ProcessController>> iterator = this.processMap.entrySet().iterator();
|
Iterator<HashMap.Entry<Integer, ProcessController>> iterator = this.processMap.entrySet().iterator();
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
HashMap.Entry<Integer, ProcessController> entry = iterator.next();
|
HashMap.Entry<Integer, ProcessController> entry = iterator.next();
|
||||||
|
|
|
||||||
|
|
@ -31,10 +31,12 @@ public class ProcessManagerTest {
|
||||||
private ProcessManagerTestImplementation manager = new ProcessManagerTestImplementation();
|
private ProcessManagerTestImplementation manager = new ProcessManagerTestImplementation();
|
||||||
private ExecutorService threadPool = Executors.newCachedThreadPool();
|
private ExecutorService threadPool = Executors.newCachedThreadPool();
|
||||||
|
|
||||||
|
private int asyncTestPid;
|
||||||
|
|
||||||
// calls a test function that simulates load by holding the lock for a long time
|
// calls a test function that simulates load by holding the lock for a long time
|
||||||
private Callable<Object> holdLockSevenSeconds = new Callable<Object>() {
|
private Callable<Object> holdLockFourSeconds = new Callable<Object>() {
|
||||||
public Object call() {
|
public Object call() {
|
||||||
manager.longCallHoldsLock();
|
manager.longCallHoldsLock(asyncTestPid);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -158,7 +160,7 @@ public class ProcessManagerTest {
|
||||||
@Test
|
@Test
|
||||||
public void getUnknownOutputTest() {
|
public void getUnknownOutputTest() {
|
||||||
String out = manager.getProcessOutput(532);
|
String out = manager.getProcessOutput(532);
|
||||||
assertEquals("[-] ERROR: Process not found", out);
|
assertEquals("[-] SERVER: Process not found", out);
|
||||||
manager.shutdown();
|
manager.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -187,22 +189,41 @@ public class ProcessManagerTest {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void asyncLockTimeoutTest() {
|
public void asyncLockTimeoutTest() {
|
||||||
Future<Object> future = this.threadPool.submit(this.holdLockSevenSeconds);
|
// start new process that will last the whole test
|
||||||
|
asyncTestPid = this.manager.newProcess("sleep 7");
|
||||||
|
int secondProcess = this.manager.newProcess("sleep 10");
|
||||||
|
|
||||||
|
// grab that processes lock for 4 seconds
|
||||||
|
Future<Object> future = this.threadPool.submit(this.holdLockFourSeconds);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(50);
|
Thread.sleep(100);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
System.err.println("[!!] Thread for async test interrupted!");
|
System.err.println("[!!] Thread for async test interrupted!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Try to grab a held lock
|
||||||
System.err.println("[2] attempting to grab (held) lock");
|
System.err.println("[2] attempting to grab (held) lock");
|
||||||
int pid = this.manager.newProcess("Test Process");
|
int status = this.manager.getProcessStatus(this.asyncTestPid);
|
||||||
assertEquals(-1, pid);
|
assertEquals(4, status); // should time out after 2 secs
|
||||||
|
|
||||||
future.cancel(true);
|
// cancel the blocking thread, release lock
|
||||||
|
//future.cancel(true);
|
||||||
|
|
||||||
int pid2 = this.manager.newProcess("echo test");
|
// try to grab unrelated lock (not nessesary, but important it works)
|
||||||
assertNotEquals(-1, pid2);
|
int statusTertiary = this.manager.getProcessStatus(secondProcess);
|
||||||
|
assertNotEquals(4, statusTertiary);
|
||||||
|
|
||||||
|
// give lockMap small time to update
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
System.err.println("[!!] Thread for async test interrupted!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// should be grabbable now
|
||||||
|
int statusSecondTry = this.manager.getProcessStatus(this.asyncTestPid);
|
||||||
|
assertNotEquals(4, statusSecondTry);
|
||||||
|
|
||||||
manager.shutdown();
|
manager.shutdown();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,25 +17,28 @@ import java.util.concurrent.TimeoutException;
|
||||||
*/
|
*/
|
||||||
class ProcessManagerTestImplementation extends ProcessManager {
|
class ProcessManagerTestImplementation extends ProcessManager {
|
||||||
|
|
||||||
public void longCallHoldsLock() {
|
public void longCallHoldsLock(int pid) {
|
||||||
try {
|
try {
|
||||||
super.getLock();
|
super.getLock(pid);
|
||||||
System.err.println("[1] Long Call Has Lock");
|
System.err.println("[1] Long Call Has Lock");
|
||||||
|
|
||||||
// hold lock for 7 seconds, more than double normal timeout.
|
// hold lock for 7 seconds, more than double normal timeout.
|
||||||
Thread.sleep(4000);
|
Thread.sleep(4000);
|
||||||
|
|
||||||
super.releaseLock();
|
super.releaseLock(pid);
|
||||||
|
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
System.err.println("[!!] Long Call wasnt able to grab lock!");
|
System.err.println("[!!] Long Call wasnt able to grab lock!");
|
||||||
return;
|
return;
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
super.releaseLock();
|
super.releaseLock(pid); // this doesnt happen, dont cancel this task
|
||||||
System.err.println("[3] Released lock: interrupted");
|
System.err.println("[3] Released lock: interrupted");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Boolean reportLockState(int pid) {
|
||||||
|
return super.lockMap.get(pid);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue