So I've thought about 3 ways of accessing files and went ahead to see which one is faster, the first one will attempt to read/write with exception handling, the second one will use a queue with worker threads and the third one will avoid trying to access the files if they exist in a read/write dictionary (multiple reads are allowed but only one write).
At a later stage I've added the dictionary check to the queue, to make it more efficient, if a specific file is writing, the 2nd attempt will be re-queued so other files can be cleared out of the queue.
The results are files per second, for 50k files, 1000 random iterations on 100 files range averaged on 10 executions
Action | Threads | OS Locking | Queued 1 | Queued 2 | Avoid |
Read | 1 | 9775 | 1000 | 990 | 10193 |
Read | 5 | 31887 | 15019 | 14560 | 31585 |
Read | 10 | 38639 | 15669 | 10660 | 36886 |
Write | 1 | 3017 | 1022 | 993 | 2791 |
Write | 5 | 5532 | 4857 | 4766 | 5413 |
Write | 10 | 5378 | 4866 | 4625 | 5339 |
Read/Write | 1 | 4504 | 1050 | 995 | 5173 |
Read/Write | 5 | 10477 | 6934 | 7711 | 10324 |
Read/Write | 10 | 9742 | 7867 | 7824 | 10773 |
The results are pretty straight forward, OS Locking uses exceptions for handling collisions, it makes it slower in one thread, but it is overall faster because there are no overheads from other elements.
I've written two queued classes, one of them is invoking a method on each queue item, the other is in a continuous loop, the continuous loop is faster.
Overall it seems that avoiding an OS file access is the same as the exception for attempting to access the same file except when the file operation takes a long time, this is probably due to the fact that the method will keep trying every 1 ms to access the file in case of "file in use" exception, wasting CPU instead of waiting peacefully for the file to be unlocked.
Its been an interesting test for me, I've thought that the queues will do a much faster job as the OS will need only to handle writing and reading files sequentially, I was wrong and it was good to find out.
I'm including all the source code for the tests, I have a feeling the DictionaryLock and the QueuedExecution will find a better use in the future.
The DictionaryLock uses ConcurrentDictionary and a SpinLock to perform a Read/Write lock on keys, the spinlock is there so only one thread will be able to insert new locks into the dictionary.
/// <summary> /// Dictionary of locks on TKey /// </summary> /// <typeparam name="TKey">Type of key</typeparam> public class DictionaryLock<TKey> { /// <summary> /// Dictionary of locks container /// </summary> private ConcurrentDictionary<TKey, ReaderWriterLockSlim> _locks = new ConcurrentDictionary<TKey, ReaderWriterLockSlim>(); /// <summary> /// _locks updating lock /// </summary> private SpinLock _operationlock = new SpinLock(); /// <summary> /// Retrieves the ReaderWriterLock for a specific key /// </summary> private ReaderWriterLockSlim GetLock(TKey key) { //check if lock exist ReaderWriterLockSlim localock; if (_locks.TryGetValue(key, out localock)) { return localock; } //it doesn't exist, lets create it bool lockTaken = false; _operationlock.Enter(ref lockTaken); //after acquired write lock, recheck its not in the dictionary if two writes were attempted for the same key if (!_locks.TryGetValue(key, out localock)) { localock = new ReaderWriterLockSlim(); _locks[key] = localock; } _operationlock.Exit(); return localock; } /// <summary> /// Enter Reader lock on key /// </summary> public void EnterReader(TKey key) { var localock = GetLock(key); localock.EnterReadLock(); } /// <summary> /// Enter Writer lock on key /// </summary> public void EnterWriter(TKey key) { var localock = GetLock(key); localock.EnterWriteLock(); } /// <summary> /// Check Reader locked on key /// </summary> public bool IsReaderLocked(TKey key) { ReaderWriterLockSlim localock; if (_locks.TryGetValue(key, out localock)) return localock.IsReadLockHeld; return false; } /// <summary> /// Check Writer locked on key /// </summary> public bool IsWriterLocked(TKey key) { ReaderWriterLockSlim localock; if (_locks.TryGetValue(key, out localock)) return localock.IsWriteLockHeld; return false; } /// <summary> /// Exit Reader lock on key /// </summary> public void ExitReader(TKey key) { ReaderWriterLockSlim localock; if (_locks.TryGetValue(key, out localock)) localock.ExitReadLock(); } /// <summary> /// Exit Writer lock on key /// </summary> public void ExitWriter(TKey key) { ReaderWriterLockSlim localock; if (_locks.TryGetValue(key, out localock)) localock.ExitWriteLock(); } }
The QueuedExecution is an abstract class providing an easy way to implement queued object handling, it uses the ManualResetEventSlim to notify the caller its done processing the request, it could use a better exception handling, I've done the minimum for this test project.
/// <summary> /// Abstract Queued Execution /// <para>Provides infrastructure for executing IItems with ProcessQueue override /// in a number of threads in defined in the constructor</para> /// </summary> public abstract class QueuedExecution : IDisposable { /// <summary> /// Process Result, returned by ProcessQueue /// </summary> protected enum ProcessResult { Success, FailThrow, FailRequeue } /// <summary> /// Item interface /// </summary> protected interface IItem {} /// <summary> /// Queue Item container /// </summary> private class QueueItem { /// <summary> /// IItem /// </summary> public IItem Item { get; set; } /// <summary> /// Result of ProcessQueue /// </summary> public ProcessResult ProcessResult { get; set; } /// <summary> /// ManualResetEvent for pinging back the waiting call /// </summary> public ManualResetEventSlim resetEvent { get; set; } } /// <summary> /// Queue containing all the items for execution /// </summary> private ConcurrentQueue<QueueItem> _queue = new ConcurrentQueue<QueueItem>(); /// <summary> /// Process Queue method, should be overriden in inheriting class /// </summary> /// <param name="item">item to be executed against</param> /// <returns>success/fail/requeue</returns> protected abstract ProcessResult ProcessQueue(IItem item); /// <summary> /// Number of threads to process queue /// </summary> private int _threadcount = 1; /// <summary> /// Threads array /// </summary> private Thread[] _threads; /// <summary> /// flag, should abort all executing threads /// </summary> private bool _threadaborted = false; /// <summary> /// Initializes the threads for execution /// </summary> private void Initialize() { _threads = new Thread[_threadcount]; for (var i = 0; i < _threadcount; i++) _threads[i] = new Thread(new ThreadStart(() => { do { QueueItem item; if (_queue.TryDequeue(out item)) { item.ProcessResult = ProcessQueue(item.Item); if (item.ProcessResult == ProcessResult.FailRequeue) { _queue.Enqueue(item); continue; } item.resetEvent.Set(); } else { Thread.Sleep(1); } } while (!_threadaborted); })); for (var i = 0; i < _threadcount; i++) _threads[i].Start(); } protected QueuedExecution(int threads) { _threadcount = threads; Initialize(); } /// <summary> /// Execute call in queue, block until processed /// </summary> /// <param name="item"></param> protected void Execute(IItem item) { var resetevent = new ManualResetEventSlim(); var qi = new QueueItem { Item = item, resetEvent = resetevent }; _queue.Enqueue(qi); resetevent.Wait(); if (qi.ProcessResult == ProcessResult.FailThrow) throw new Exception("execution failed"); } #region IDisposable Members /// <summary> /// cleanup /// </summary> /// <param name="waitForFinish">should wait for process to finish /// currently executing request or abort immediately</param> /// <param name="wait">time to wait for abort to finish</param> public void Dispose(bool waitForFinish,TimeSpan wait) { _threadaborted = true; bool allaborted = true; if (waitForFinish) { //wait for timeout, check if threads aborted gracefully in that time while ((DateTime.Now + wait) > DateTime.Now) { allaborted = true; foreach (var t in _threads) { if (t.IsAlive == true) { allaborted = false; break; } } if (allaborted == true) break; Thread.Sleep(1); } } //if not all threads were aborted, abort them if (allaborted == false) { foreach (var t in _threads) if (t.IsAlive) t.Abort(); } } public void Dispose() { Dispose(false,TimeSpan.MinValue); } #endregion }
You can find the source code here:
https://github.com/drorgl/ForBlog/tree/master/FileCollisionTests
0 comments:
Post a Comment