Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
573 views
in Technique[技术] by (71.8m points)

scala - Play 2.x : Reactive file upload with Iteratees

I will start with the question: How to use Scala API's Iteratee to upload a file to the cloud storage (Azure Blob Storage in my case, but I don't think it's most important now)

Background:

I need to chunk the input into blocks of about 1 MB for storing large media files (300 MB+) as an Azure's BlockBlobs. Unfortunately, my Scala knowledge is still poor (my project is Java based and the only use for Scala in it will be an Upload controller).

I tried with this code: Why makes calling error or done in a BodyParser's Iteratee the request hang in Play Framework 2.0? (as a Input Iteratee) - it works quite well but eachElement that I could use has size of 8192 bytes, so it's too small for sending some hundred megabyte files to the cloud.

I must say that's quite a new approach to me, and most probably I misunderstood something (don't want to tell that I misunderstood everything ;> )

I will appreciate any hint or link, which will help me with that topic. If is there any sample of similar usage it would be the best option for me to get the idea.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Basically what you need first is rechunk input as bigger chunks, 1024 * 1024 bytes.

First let's have an Iteratee that will consume up to 1m of bytes (ok to have the last chunk smaller)

val consumeAMB = 
  Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()

Using that, we can construct an Enumeratee (adapter) that will regroup chunks, using an API called grouped:

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
  Enumeratee.grouped(consumeAMB)

Here grouped uses an Iteratee to determine how much to put in each chunk. It uses the our consumeAMB for that. Which means the result is an Enumeratee that rechunks input into Array[Byte] of 1MB.

Now we need to write the BodyParser, which will use the Iteratee.foldM method to send each chunk of bytes:

val writeToStore: Iteratee[Array[Byte],_] =
  Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => 
    // write bytes and return next handle, probable in a Future
  }

foldM passes a state along and uses it in its passed function (S,Input[Array[Byte]]) => Future[S] to return a new Future of state. foldM will not call the function again until the Future is completed and there is an available chunk of input.

And the body parser will be rechunking input and pushing it into the store:

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))

Returning a Right indicates that you are returning a body by the end of the body parsing (which happens to be the handler here).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...