Merge branch 'raceConditions' into 'CICD'

Race conditions fixed, all tests in CI pass

See merge request whom/jobserv!5
This commit is contained in:
Aidan Hahn 2019-06-01 22:10:52 +00:00
commit e74c74b909
3 changed files with 66 additions and 72 deletions

View file

@ -8,11 +8,14 @@
package JobServ; package JobServ;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/* /*
* ProcessController * ProcessController
@ -33,18 +36,23 @@ class ProcessController {
private BufferedReader reader; private BufferedReader reader;
private Process process; private Process process;
private Boolean killedManually = false; private Boolean killedManually = false;
private Lock lock;
private int lockTimeout; // seconds
/* /*
* Constructor * Constructor
* Takes a command and spawns it in a new process * Takes a command and spawns it in a new process
* Redirects IO streams and assigns a fake PID * Redirects IO streams and assigns a fake PID
*/ */
public ProcessController(String command) throws IOException { public ProcessController(String command, int lockTimeout) throws IOException {
this.pid = ProcessController.nextPid; this.pid = ProcessController.nextPid;
ProcessController.nextPid += 1; ProcessController.nextPid += 1;
this.lock = new ReentrantLock();
this.lockTimeout = lockTimeout;
this.process = Runtime.getRuntime().exec(command); this.process = Runtime.getRuntime().exec(command);
this.output = this.process.getOutputStream(); this.output = this.process.getOutputStream();
this.input = this.process.getInputStream(); this.input = this.process.getInputStream();
@ -54,6 +62,28 @@ class ProcessController {
JobServServer.logger.write("Job " + String.valueOf(this.pid) + ": " + command); JobServServer.logger.write("Job " + String.valueOf(this.pid) + ": " + command);
} }
/*
* getLock()
* attempts to get the lock for lockTimeout seconds
* or throws exceptions if interrupted
*/
public boolean getLock() throws InterruptedException {
return this.lock.tryLock(this.lockTimeout, TimeUnit.SECONDS);
}
/*
* releaseLock()
* releases lock on process
*/
public void releaseLock() {
try {
this.lock.unlock();
} catch (IllegalMonitorStateException e) {
JobServServer.logger.write("Thread tried to release a lock it didnt have! " + e.getMessage());
}
}
/* /*
* getPid() * getPid()
* returns translated pid of this process * returns translated pid of this process

View file

@ -40,7 +40,6 @@ class ProcessManager {
* processMap * processMap
*/ */
protected ConcurrentHashMap<Integer, ProcessController> processMap; protected ConcurrentHashMap<Integer, ProcessController> processMap;
protected ConcurrentHashMap<Integer, Boolean> lockMap;
private ExecutorService threadPool = Executors.newCachedThreadPool(); private ExecutorService threadPool = Executors.newCachedThreadPool();
/* /*
@ -49,7 +48,6 @@ class ProcessManager {
*/ */
public ProcessManager() { public ProcessManager() {
processMap = new ConcurrentHashMap<Integer, ProcessController>(); processMap = new ConcurrentHashMap<Integer, ProcessController>();
lockMap = new ConcurrentHashMap<Integer, Boolean>();
/* TODO: In a long running server over a large period of time /* TODO: In a long running server over a large period of time
* It is possible that the streams used to redirect IO in the * It is possible that the streams used to redirect IO in the
* Processes may become a significant use of resources. * Processes may become a significant use of resources.
@ -68,11 +66,9 @@ class ProcessManager {
public int newProcess(String command) { public int newProcess(String command) {
try { try {
ProcessController newProc = new ProcessController(command); ProcessController newProc = new ProcessController(command, this.LOCK_TIMEOUT);
// we dont need to lock the map yet // we dont need to lock the map yet
this.lockMap.put(newProc.getPid(), true);
this.processMap.put(newProc.getPid(), newProc); this.processMap.put(newProc.getPid(), newProc);
this.releaseLock(newProc.getPid()); this.releaseLock(newProc.getPid());
return newProc.getPid(); return newProc.getPid();
@ -95,14 +91,13 @@ class ProcessManager {
public int getProcessStatus(int pid) { public int getProcessStatus(int pid) {
try { try {
if(!this.getLock(pid)) { if(!this.getLock(pid)) {
return 3; // lock could not be grabbed before timeout
JobServServer.logger.write("Timeout getting process status: " + String.valueOf(pid));
return 4;
} }
} catch (TimeoutException e) { } catch (IndexOutOfBoundsException e) {
// lock could not be grabbed before timeout return 3;
JobServServer.logger.write("Timeout getting process " +
String.valueOf(pid) + " status: " + e.getMessage());
return 4;
} }
ProcessController candidate = this.processMap.get(pid); ProcessController candidate = this.processMap.get(pid);
@ -123,13 +118,12 @@ class ProcessManager {
public int getProcessReturn(int pid) { public int getProcessReturn(int pid) {
try { try {
if(!this.getLock(pid)) { if(!this.getLock(pid)) {
return 258; JobServServer.logger.write("Timeout getting process return: " + String.valueOf(pid));
return 259;
} }
} catch (TimeoutException e) { } catch (IndexOutOfBoundsException e) {
JobServServer.logger.write("Timeout getting process " + return 258;
String.valueOf(pid) + " return: " + e.getMessage());
return 259;
} }
ProcessController candidate = this.processMap.get(pid); ProcessController candidate = this.processMap.get(pid);
@ -146,13 +140,12 @@ class ProcessManager {
public String getProcessOutput(int pid, int lines) { public String getProcessOutput(int pid, int lines) {
try { try {
if(!this.getLock(pid)) { if(!this.getLock(pid)) {
return "[-] SERVER: Process not found"; JobServServer.logger.write("Timeout getting process output: " + String.valueOf(pid));
return "[-] SERVER: Timeout grabbing lock to access process information";
} }
} catch (TimeoutException e) { } catch (IndexOutOfBoundsException e) {
JobServServer.logger.write("Timeout getting process " + return "[-] SERVER: Process not found";
String.valueOf(pid) + " output: " + e.getMessage());
return "[-] SERVER: Timeout grabbing lock to access process information";
} }
ProcessController candidate = this.processMap.get(pid); ProcessController candidate = this.processMap.get(pid);
@ -172,13 +165,13 @@ class ProcessManager {
public int killProcess(int pid) { public int killProcess(int pid) {
try { try {
if(!this.getLock(pid)) { if(!this.getLock(pid)) {
return 2; JobServServer.logger.write("Timeout killing process: " + String.valueOf(pid));
return 3;
} }
} catch (TimeoutException e) { } catch (IndexOutOfBoundsException e) {
JobServServer.logger.write("Timeout killing process " +
String.valueOf(pid) + ": " + e.getMessage()); return 2;
return 3;
} }
ProcessController candidate = this.processMap.get(pid); ProcessController candidate = this.processMap.get(pid);
@ -194,45 +187,20 @@ class ProcessManager {
* Function is synchronized to prevent multiple threads accessing the same lock at once * Function is synchronized to prevent multiple threads accessing the same lock at once
* (ConcurrentHashMap will report whatever lock value was last to successfully update) * (ConcurrentHashMap will report whatever lock value was last to successfully update)
*/ */
protected synchronized Boolean getLock(int pid) throws TimeoutException { protected synchronized Boolean getLock(int pid) throws IndexOutOfBoundsException {
if (!lockMap.containsKey(pid)) { ProcessController candidate = this.processMap.get(pid);
return false; if (candidate == null) {
throw new IndexOutOfBoundsException();
} }
Future<Object> future = this.threadPool.submit(
new Callable<Object>() {
public Object call() {
while(lockMap.get(pid)) {
continue; // spin!
}
lockMap.replace(pid, true);
return 1;
}
});
try { try {
future.get(this.LOCK_TIMEOUT, TimeUnit.SECONDS); Boolean success = candidate.getLock();
return success;
} catch (InterruptedException e) { } catch (InterruptedException e) {
JobServServer.logger.write("[!] Couldnt get lock " + JobServServer.logger.write("[!] Couldnt get lock " +
String.valueOf(pid) + ": "+ e.getMessage()); String.valueOf(pid) + ": "+ e.getMessage());
future.cancel(true);
// in case lock was grabbed after exception
this.releaseLock(pid);
return false; return false;
} catch (ExecutionException e) {
JobServServer.logger.write("[!] Couldnt get lock " +
String.valueOf(pid) + ": "+ e.getMessage());
future.cancel(true);
// in case lock was grabbed after exception
this.releaseLock(pid);
return false;
// cancel the attempt to grab the lock
} }
/* /*
@ -247,8 +215,6 @@ class ProcessManager {
* mediate access to the ProcessManager * mediate access to the ProcessManager
* object for fresh calls as well. * object for fresh calls as well.
*/ */
return true;
} }
/* /*
@ -256,7 +222,13 @@ class ProcessManager {
* releases mutex so other threads can operate on processqueue * releases mutex so other threads can operate on processqueue
*/ */
protected void releaseLock(int pid) { protected void releaseLock(int pid) {
this.lockMap.put(pid, false); ProcessController candidate = this.processMap.get(pid);
if (candidate == null) {
JobServServer.logger.write("Tried to release lock of process that doesnt exist!");
return;
}
candidate.releaseLock();
} }
/* /*

View file

@ -27,18 +27,10 @@ class ProcessManagerTestImplementation extends ProcessManager {
super.releaseLock(pid); super.releaseLock(pid);
} catch (TimeoutException e) {
System.err.println("[!!] Long Call wasnt able to grab lock!");
return;
} catch (InterruptedException e) { } catch (InterruptedException e) {
super.releaseLock(pid); // this doesnt happen, dont cancel this task super.releaseLock(pid); // this doesnt happen, dont cancel this task
System.err.println("[3] Released lock: interrupted"); System.err.println("[3] Released lock: interrupted");
return; return;
} }
} }
public Boolean reportLockState(int pid) {
return super.lockMap.get(pid);
}
} }