Sorting Huge files in Split Seconds
During my project, there
was a requirement to sort a file having 1 lakh records of fixed length data. I
just wanted to share a solution I adopted to sort this fast. The implementation
was done using Spring batch and Java NIO.
k-way merge sort algorithm was used to sort the
file. K-way merge sort follows the following steps:
- Splits the input file into ‘k’ chunks and write them into ‘k’ files in sorted order.
- Open the ‘k’ files and read the first lines of each file and compare them. The least line is written to the final file. Also the file pointer for the file in which the least line is taken is incremented to read the next line.
- Step 2 is repeated until all the records in all the ‘k’ files are read through.
As
mentioned earlier, Spring batch is used for the implementation. Spring batch
provides a concept of chunking. A chunk of data is read from the input file,
processed and written to an output resource. Given a large input file and that
we need to >> split the file
into ‘k’ chunks >> sort
the individual records in the chunk >> write them to a chunk file named ‘chunk_k’,
we use spring batch to process chunks
within the file.
To process
fast, we would need individual threads to cater to the processing of each chunk
of data.
But this
would mean that we would have the same input file being read by multiple
threads. In this scenario the number of threads can be considered equal to the
number of chunks the file would be have to be split into. In that sense, ‘k’
threads will be accessing ‘k’chunks of data from the input file as shown in the
below diagram :
The
threads are configured as below:
<beans:beanid="taskExecutor"
class="org.springframework.core.task.SimpleAsyncTaskExecutor">
<beans:propertyname="concurrencyLimit"value="10"/>
The
number of chunks for the input file is calculated by dividing the size of the
file(in bytes) by the chunk size (in bytes).
Chunk
size is set to: 6760 bytes.
Now each
thread is aware that they need to read “Chunk_size” bytes at max from the input
file.
But how
do the threads know at what position they need to read from within the file. Or
in other words, how will the threads coordinate among them so that each thread
in ensured read from a unique position within the file, which no other thread
is guaranteed to read and conflict with. Clearly synchronizing on the file read
operation on the input file would hurt our scalability.
To solve
this issue, we have used non blocking ‘Compare
and Swap’ concurrent data structure. This ensures an optimistic locking
performance. Each thread would CPU spin till it wins with a file position which
no other threads have managed to win. For this we have AtomicReference from
java concurrent data structures. The following snippet depicts this:
privatefinalAtomicReference<Byte_Chunk_Pair>Byte_Chunk_Atomic_Ref
=
newAtomicReference<Byte_Chunk_Pair>(newByte_Chunk_Pair(0,
0));
publicByte_Chunk_PairincrementByte_ChunkCounterAtomically()
{
Byte_Chunk_Pairprev,
newValue;
do {
prev =
Byte_Chunk_Atomic_Ref.get();
longnoOfBytes =
prev.getByteTracker() + READ_SIZE;
longnoOfChunks =
prev.getChunkTracker() + 1;
newValue =
newByte_Chunk_Pair(noOfBytes, noOfChunks);
} while
(!Byte_Chunk_Atomic_Ref.compareAndSet(prev, newValue));
returnprev;
}
Now the
threads would individually read their individual chunk from the input file
using AsynchronousFileChannel
from java no package. This is depicted below:
public SortDTOreadChunk(){
ByteBuffer buffer =
ByteBuffer.allocate(AsynchIOChannelHolder.READ_SIZE);
// increment chunk count
and byte count atomically
Byte_Chunk_PairpreviousByte_Chunk_Pair= incrementByte_ChunkCounterAtomically();
if(previousByte_Chunk_Pair.getByteTracker()>getFileSizeinBytes()){
returnnull;
}
Future<Integer>futre
= getReadChannel().read(buffer, previousByte_Chunk_Pair.getByteTracker());
SortDTO sortDTO= new SortDTO();
sortDTO.setFileResult(futre);
sortDTO.setBuffer(buffer);
sortDTO.setChunkTracker(previousByte_Chunk_Pair.getChunkTracker());
sortDTO.setByteTracker(previousByte_Chunk_Pair.getByteTracker());
return sortDTO;
}
AsynchronousFileChannel
helps us read from a particular position within the file. More importantly this
is done asynchronously. As seen in the above snippet, the read operation
returns a “Future”object. Any thread at any point during its execution can just
call the “get” method on this future object to fetch the status of data read
from the file as shown in the below snippet.
//Wait for all the
writes to complete
SortIOChannelHolderchannelHolder= SortIOChannelHolder.getInstance();
List<Future<Integer>>writerWorkCompletionHandles
= channelHolder.getWriterWorkCompletionHandles();
for(Future<Integer>writeTask:
writerWorkCompletionHandles){
try {
writeTask.get();
} catch
(InterruptedException | ExecutionException e) {
//
TODO Auto-generated catch block
e.printStackTrace();
}
}
Now the
read,process,write operation from each thread runs in parallel as shown below:
We do
not need to write the whole line contents into the temporary chunk_1 to chunk_k
files.Instead the following information is written for each line visited from
the input file.(considering there are n values to compare in a line)
<Starting
Bytenumber For this line in the InputFile>,<Byte Length For this line in
the Input File>,<Field1 value for compare>,<Field2 value to
compare>….<Field n value to compare>
For
writing we again use the asynchronous channel as below:
SortIOChannelHolder.getInstance().getWriterWorkCompletionHandles().add(channelToWrite.write(ByteBuffer.wrap(sb.toString().getBytes()),
pos));
As
depicted above the write also takes the starting byte number .It returns a
future object. All such future objects from different threads are taken into a
list which can be later checked for completion. Since this write happens
asynchronously, the thread for each chunk saves the waiting time incurred on
writing. For a thread pool with lower concurrent limit, this means that the
thread can be allocated the work of the next chunk to process (if any) faster.
But
there was still an issue. The read was done in terms of bytes. Given that the
file contained lines and that an entity that we needed to sort is a line, how
will the thread safely read complete lines.What if one thread reads part of a line
and other thread reads the remaining part of the same line ?
To solve
this, after reading a chunk in each thread, the thread coverts the byte buffer
into a string and splits the string by newline character. The chance of
incomplete lines comes only at the first line and the last lines in a chunk. Except
for the first chunk and the last chunk , the first line and the last lines
within the chunk are saved into a ‘Fragment’ object. In addition to the first line and the last
line, the chunk number is also fed into this ‘Fragment’ object. In other words,
the fragment object contains first line,last line and the chunk number. So if
there are k chunks, there would be k fragments. The fragment for the first
chunk has an empty first line and the fragment for the last chunk (kth chunk)
would have an empty last line.(this is because the first line of the first
chunk and the last line of the last chunk are already complete lines and need
not be captured as fragments)
After
creating the fragment, each thread would add this fragment object into a thread
safe priority ‘fragments queue’. This is an instance of PriorityBlockingQueue.The thread processing
the first chunk has the additional responsibility of dequeing fragment objects fed
by the other chunk threads in the order of the chunk numbers within the
fragment object and constructing complete lines. A complete line would be the
last line in nth line concatenated with the starting line of n+1 th line.
Finally this thread would write these data also in chunk_1 file as depicted
below.
As
mentioned earlier, the write instructions to the temporary files are
asynchronous which just returns the Future objects, which we were storing into
a list object.After all the chunks are done from the input file, the waiting is
done each of these chunks.This done by registering a job listener(spring batch
API) and implementing the ‘after job’ method to wait on the future objects.This
ensures that all the write operations are done before the job completes (and
JVM exits).
//Wait for all
the writes to complete
SortIOChannelHolderchannelHolder= SortIOChannelHolder.getInstance();
List<Future<Integer>>writerWorkCompletionHandles
= channelHolder.getWriterWorkCompletionHandles();
for(Future<Integer>writeTask:
writerWorkCompletionHandles){
try {
writeTask.get();
} catch
(InterruptedException | ExecutionException e) {
//
TODO Auto-generated catch block
e.printStackTrace();
}
}
Now that
we have finished with chunk file writing, we need to merge them into the final
output file.This is accomplished by a single thread (the merger thread).
The
files chunk_1 to chunk_k are read using the asynch file channel and put into a
thread safe queue. The records among these lines from the different chunk files
are compared and the winning record is chosen. The winning record is parsed to
fetch the starting byte number and byte length. Using this information a
different signature of the read methods in the asynchronous file channel is
invoked as shown in the below snippet:
private long transferDataToOutPutFile(long
posInWriterFile,
SortableLineDTO
sortableDTO) {
ByteBuffer buffer = null;
try{
buffer =
ByteBuffer.allocate((int)sortableDTO.getBytesLengthForThisLine());
}
catch(NullPointerException
npe){
System.out.println("sortableDTO
is "+ sortableDTO);
npe.printStackTrace();
}
BufferAndPosHolderForWritting
dataHolder = new BufferAndPosHolderForWritting();
dataHolder.setBufferToWrite(buffer);
dataHolder.setPosToWrite(posInWriterFile);
posInWriterFile=posInWriterFile+sortableDTO.getBytesLengthForThisLine();
getReadChannel().read(buffer,
sortableDTO.getStartingByteNumberForThisLine(), dataHolder,
newCompletionHandler<Integer, BufferAndPosHolderForWritting>() {
@Override
publicvoid completed(Integer
result, BufferAndPosHolderForWritting attachment) {
attachment.getBufferToWrite().flip();
getListOfWrittingFutures().add(getOutFileWriteChannel().write(
attachment.getBufferToWrite(),
attachment.getPosToWrite()));
}
@Override
public void
failed(Throwable exc, BufferAndPosHolderForWritting attachment) {
System.out.println("Some
error occurred while writting ");
}
});
return posInWriterFile;
}
This
signature does not return any future as such. Instead as soon as the thread
that completes the read operation completes asynchronously, the same thread
invokes the operation implemented in the completion handler. As can be seen
above, the completion handler retrieves the bytes read from the input file and
invokes a write of the very same bytes into the final output file. However this
write operation returns back a Future object, which we would wait for
completion after all the write task submission are done alike for all lines.
Since multiple reader threads would have initiated the write operation, the
position to write is tallied using an AtomicLong object. This whole process is
invoked is depicted below:
Finally
as shown in the above diagram, the wait is done on the Future object for all
the line write instructions so that the JVM exits only after all the writes are
completely done.
Using
this strategy with a concurrency limit of the thread pool as 10 and chunk size
as 6760 bytes, sorting of 1 lakh records completed in 10 seconds.