My Realm

A parallel algorithm which went down




While working on the Grid(yes that's what I do all day) I thought lets parallelize the file sharing code and designed a decent algorithm which broke the file into pieces, hosted each piece on a TCP server, and multiple clients could download them or a single client with multiple connections on the servers could download the file in a parallel fashion. The experiment worked but did not yield expected results. Not only the speed was intolerably slow but was varying. I expected it to settle down on a constant time since it was a parallel map operation and hence O(1) but hell it didn't. Look into the code if you want to modify or want to go on a bug hunt.


package labs.amethyst
import java.io._
import java.net.ServerSocket
import scala.annotation.tailrec
class FileServer(portNo: Int) extends server {
var seed = 6000
@tailrec
private def checkPort(port: Int): Int = {
try {
val checkSocket = new ServerSocket(port)
checkSocket.close()
port
}
catch {
case e: java.net.BindException =>
if (port < 3000)
checkPort(port + 3000)
else if (port > 10000)
checkPort(port - 3000)
else
checkPort(port - 1000)
}
}
override def preStart() = {
self ! "host"
}
def receive = {
case "host" =>
println("called")
val file=new File("F:\\"+Main.fileToUse)
val packetSize=50000
val numPackets=(file.length()/packetSize).toInt + 1
val bis = new BufferedInputStream(new FileInputStream(file))
val portList = 0.until(numPackets).map{
index=>
val tempBuffer=new Array[Byte](packetSize)
bis.read(tempBuffer)
checkPort(seed+index)->tempBuffer
}
val ports=portList.unzip._1.mkString(":")
port = portNo
startServer()
inStream.readUTF()
outStream.writeUTF(file.length() + "-" + ports)
outStream.flush()
val threadMap = portList.map {
index =>
new Thread {
override def run(): Unit = {
try {
val fileServerSocket = new ServerSocket(index._1)
fileServerSocket.setSoTimeout(7000)
val t = System.currentTimeMillis()
while ((System.currentTimeMillis() - t) < 7000) {
val fileServer = fileServerSocket.accept()
val fileOutStream = new DataOutputStream(fileServer.getOutputStream)
println("Server sent on " + index._1 + " " + index._2.mkString.hashCode())
fileOutStream.write(index._2)
fileOutStream.flush()
fileOutStream.close()
fileServer.close()
}
fileServerSocket.close()
}
catch {
case e: Exception =>
//println("server failed on "+index._1+" "+e)
}
}
}
}
println(System.currentTimeMillis())
threadMap.foreach(_.start())
while (threadMap.map(_.isAlive).contains(true)) {
}
println("ending " + System.currentTimeMillis())
/*val increment=seed+packets.size+10
seed =
if(increment>10000)
5000
else
increment*/
self ! "host"
case _ =>
}
}

package labs.amethyst
import java.io._
import java.net.Socket
import akka.actor.Actor
import com.google.common.io.Files
class FileClient extends Actor {
def receive = {
case (ip: String, fileName: String) =>
val client = new Socket(ip, 1048)
val outStream = new DataOutputStream(client.getOutputStream)
outStream.writeUTF(fileName)
outStream.flush()
val inStream = new DataInputStream(client.getInputStream)
val inData = inStream.readUTF().split("-")
val (fileSize, ports) = (Integer.parseInt(inData(0)), inData(1).split(":").map(Integer.parseInt))
val threadMap = ports.zipWithIndex.map {
seed =>
new Thread {
override def run() = {
setName("Thread-" + seed._2)
try {
val fileClient = new Socket(ip, seed._1)
val fileInStream = new DataInputStream(fileClient.getInputStream)
val tempBuffer = new Array[Byte](50000)
fileInStream.read(tempBuffer)
val fio = new FileOutputStream("F:\\Testing\\" + Files.getNameWithoutExtension(Main.fileToUse)+"-"+seed._2+".grid")
fio.write(tempBuffer)
fio.close()
fileInStream.close()
fileClient.close()
println("Client received on "+seed._1+" "+tempBuffer.mkString.hashCode())
}
catch {
case e: Exception =>
println("failed on " + seed + " " + e)
}
}
}
}
threadMap.foreach(_.start())
while (threadMap.map(_.isAlive).contains(true)) {
}
val fio = new FileOutputStream("F:\\Prod\\"+fileName)
Sys.getListOfFiles("F:\\Testing")
.filter(file=>file.getName.contains(Files.getNameWithoutExtension(fileName))&& file.getName.endsWith(".grid"))
.sortBy(file=>Integer.parseInt(file.getName.split("-")(1).split(".grid")(0)))
.foreach{
file=>
val bis = new BufferedInputStream(new FileInputStream(file))
val tempBuffer=new Array[Byte](50000)
bis.read(tempBuffer)
fio.write(tempBuffer)
bis.close()
}
fio.close()
/*println("client"+System.currentTimeMillis())
val finalData=Sys.getListOfFiles("F:\\Testing").map(file=>Main.getContents(file.getPath)).mkString
Base64.decodeToFile(finalData,"F:\\Testing\\"+fileName)*/
//println(System.currentTimeMillis())
case _ => println("something")
}
}

No comments:

Post a Comment

I am on wordpress too|No Copyright © 2014



Powered by Blogger.