While working on the Grid(yes that's what I do all day) I thought lets parallelize the file sharing code and designed a decent algorithm which broke the file into pieces, hosted each piece on a TCP server, and multiple clients could download them or a single client with multiple connections on the servers could download the file in a parallel fashion. The experiment worked but did not yield expected results. Not only the speed was intolerably slow but was varying. I expected it to settle down on a constant time since it was a parallel map operation and hence O(1) but hell it didn't. Look into the code if you want to modify or want to go on a bug hunt.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package labs.amethyst | |
import java.io._ | |
import java.net.ServerSocket | |
import scala.annotation.tailrec | |
class FileServer(portNo: Int) extends server { | |
var seed = 6000 | |
@tailrec | |
private def checkPort(port: Int): Int = { | |
try { | |
val checkSocket = new ServerSocket(port) | |
checkSocket.close() | |
port | |
} | |
catch { | |
case e: java.net.BindException => | |
if (port < 3000) | |
checkPort(port + 3000) | |
else if (port > 10000) | |
checkPort(port - 3000) | |
else | |
checkPort(port - 1000) | |
} | |
} | |
override def preStart() = { | |
self ! "host" | |
} | |
def receive = { | |
case "host" => | |
println("called") | |
val file=new File("F:\\"+Main.fileToUse) | |
val packetSize=50000 | |
val numPackets=(file.length()/packetSize).toInt + 1 | |
val bis = new BufferedInputStream(new FileInputStream(file)) | |
val portList = 0.until(numPackets).map{ | |
index=> | |
val tempBuffer=new Array[Byte](packetSize) | |
bis.read(tempBuffer) | |
checkPort(seed+index)->tempBuffer | |
} | |
val ports=portList.unzip._1.mkString(":") | |
port = portNo | |
startServer() | |
inStream.readUTF() | |
outStream.writeUTF(file.length() + "-" + ports) | |
outStream.flush() | |
val threadMap = portList.map { | |
index => | |
new Thread { | |
override def run(): Unit = { | |
try { | |
val fileServerSocket = new ServerSocket(index._1) | |
fileServerSocket.setSoTimeout(7000) | |
val t = System.currentTimeMillis() | |
while ((System.currentTimeMillis() - t) < 7000) { | |
val fileServer = fileServerSocket.accept() | |
val fileOutStream = new DataOutputStream(fileServer.getOutputStream) | |
println("Server sent on " + index._1 + " " + index._2.mkString.hashCode()) | |
fileOutStream.write(index._2) | |
fileOutStream.flush() | |
fileOutStream.close() | |
fileServer.close() | |
} | |
fileServerSocket.close() | |
} | |
catch { | |
case e: Exception => | |
//println("server failed on "+index._1+" "+e) | |
} | |
} | |
} | |
} | |
println(System.currentTimeMillis()) | |
threadMap.foreach(_.start()) | |
while (threadMap.map(_.isAlive).contains(true)) { | |
} | |
println("ending " + System.currentTimeMillis()) | |
/*val increment=seed+packets.size+10 | |
seed = | |
if(increment>10000) | |
5000 | |
else | |
increment*/ | |
self ! "host" | |
case _ => | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package labs.amethyst | |
import java.io._ | |
import java.net.Socket | |
import akka.actor.Actor | |
import com.google.common.io.Files | |
class FileClient extends Actor { | |
def receive = { | |
case (ip: String, fileName: String) => | |
val client = new Socket(ip, 1048) | |
val outStream = new DataOutputStream(client.getOutputStream) | |
outStream.writeUTF(fileName) | |
outStream.flush() | |
val inStream = new DataInputStream(client.getInputStream) | |
val inData = inStream.readUTF().split("-") | |
val (fileSize, ports) = (Integer.parseInt(inData(0)), inData(1).split(":").map(Integer.parseInt)) | |
val threadMap = ports.zipWithIndex.map { | |
seed => | |
new Thread { | |
override def run() = { | |
setName("Thread-" + seed._2) | |
try { | |
val fileClient = new Socket(ip, seed._1) | |
val fileInStream = new DataInputStream(fileClient.getInputStream) | |
val tempBuffer = new Array[Byte](50000) | |
fileInStream.read(tempBuffer) | |
val fio = new FileOutputStream("F:\\Testing\\" + Files.getNameWithoutExtension(Main.fileToUse)+"-"+seed._2+".grid") | |
fio.write(tempBuffer) | |
fio.close() | |
fileInStream.close() | |
fileClient.close() | |
println("Client received on "+seed._1+" "+tempBuffer.mkString.hashCode()) | |
} | |
catch { | |
case e: Exception => | |
println("failed on " + seed + " " + e) | |
} | |
} | |
} | |
} | |
threadMap.foreach(_.start()) | |
while (threadMap.map(_.isAlive).contains(true)) { | |
} | |
val fio = new FileOutputStream("F:\\Prod\\"+fileName) | |
Sys.getListOfFiles("F:\\Testing") | |
.filter(file=>file.getName.contains(Files.getNameWithoutExtension(fileName))&& file.getName.endsWith(".grid")) | |
.sortBy(file=>Integer.parseInt(file.getName.split("-")(1).split(".grid")(0))) | |
.foreach{ | |
file=> | |
val bis = new BufferedInputStream(new FileInputStream(file)) | |
val tempBuffer=new Array[Byte](50000) | |
bis.read(tempBuffer) | |
fio.write(tempBuffer) | |
bis.close() | |
} | |
fio.close() | |
/*println("client"+System.currentTimeMillis()) | |
val finalData=Sys.getListOfFiles("F:\\Testing").map(file=>Main.getContents(file.getPath)).mkString | |
Base64.decodeToFile(finalData,"F:\\Testing\\"+fileName)*/ | |
//println(System.currentTimeMillis()) | |
case _ => println("something") | |
} | |
} |
No comments:
Post a Comment