[botan-devel] Threaded Filters/Operation Parallelisation

Jack Lloyd lloyd at randombit.net
Fri Jan 11 15:22:38 EST 2013


On Thu, Jan 10, 2013 at 01:13:09AM +0000, Joel Low wrote:
> Thanks Jack for writing that test.
> 
> My program is pretty similar to that, just that mine deals directly with files 
> and not a memory buffer, so your loop is much tighter than mine (no file I/O), 
> exposing the race condition. I forgot to lock the mutex before notifying the 
> workers, so it could have signalled while the workers were still busy.
> 
> I've attached the patch to apply *relative to the previous patch* so you'll 
> have to apply that patch first, then this one to make v2 of the patch.

I'm still seeing a deadlock. It looks like what happens is that all
threads expect to receive the more-work condition that there is more
work to be done, but it looks like sometimes one or more worker
threads don't come around the loop to block in the wait until after
the notification causing it to be lost, causing a deadlock when
thread_delegate_work waits on the output complete semaphore which is
never signalled by that thread.

I changed it to use a second semaphore instead which seems to resolve
it nicely for me. Can you take a look at the attached patch (taken
against mainline in C++11 so maybe not usable on VC)?

-Jack
-------------- next part --------------
#
# old_revision [69dedff16fdd75876bb9b2b18588f6909bc6a53b]
#
# add_file "src/filters/threaded_fork.cpp"
#  content [f136c3c453f45ad5e95148146694e7709e990b59]
# 
# add_file "src/utils/semaphore.cpp"
#  content [5b0bf2cd17ef35bc799b1f255d4883f52ae5aefa]
# 
# add_file "src/utils/semaphore.h"
#  content [f3ed44a6b43c9401aa262e3b9c02ae359c32e24c]
# 
# patch "doc/license.rst"
#  from [53baada06912fdead9320e307260809ac60cefff]
#    to [f28b291e2b83c2212dfac141652699f759de6ce8]
# 
# patch "src/filters/basefilt.h"
#  from [9909b1885f657e3aac8cbfede57928ed481cf757]
#    to [5434e9357b3f6edc7b933126cb525227d09d68d4]
# 
# patch "src/filters/filter.h"
#  from [e18d8b9397a9f429e917aa5baf46689b53a85291]
#    to [cefd989049b1dd6593a17aeeaeb8cda2e680a209]
# 
# patch "src/filters/info.txt"
#  from [9380a633685d912ca4cba11b61fcb32ebe7ea060]
#    to [b37d3de5ee7ac7914181c201f1186997c52fa67c]
# 
# patch "src/utils/info.txt"
#  from [1fd0bdc49e01581b411c3aeafe3776b95f17aebf]
#    to [c6456e7c7d0ac32668524e07d116b525e501255e]
#
============================================================
--- doc/license.rst	53baada06912fdead9320e307260809ac60cefff
+++ doc/license.rst	f28b291e2b83c2212dfac141652699f759de6ce8
@@ -25,6 +25,7 @@ Botan (http://botan.randombit.net/) is d
                 2010 Olivier de Gaalon
                 2012 Vojtech Kral
                 2012 Markus Wanner
+                2013 Joel Low
   All rights reserved.
 
   Redistribution and use in source and binary forms, with or without
============================================================
--- src/filters/basefilt.h	9909b1885f657e3aac8cbfede57928ed481cf757
+++ src/filters/basefilt.h	5434e9357b3f6edc7b933126cb525227d09d68d4
@@ -1,6 +1,7 @@
 /*
 * Basic Filters
 * (C) 1999-2007 Jack Lloyd
+* (C) 2013 Joel Low
 *
 * Distributed under the terms of the Botan license
 */
@@ -9,6 +10,8 @@
 #define BOTAN_BASEFILT_H__
 
 #include <botan/filter.h>
+#include <thread>
+#include <memory>
 
 namespace Botan {
 
@@ -76,6 +79,42 @@ class BOTAN_DLL Fork : public Fanout_Fil
       Fork(Filter* filter_arr[], size_t length);
    };
 
+/**
+* This class is a threaded version of the Fork filter. While this uses
+* threads, the class itself is NOT thread-safe. This is meant as a drop-
+* in replacement for Fork where performance gains are possible.
+*/
+class BOTAN_DLL Threaded_Fork : public Fork
+   {
+   public:
+      std::string name() const;
+
+      /**
+      * Construct a Threaded_Fork filter with up to four forks.
+      */
+      Threaded_Fork(Filter*, Filter*, Filter* = nullptr, Filter* = nullptr);
+
+      /**
+      * Construct a Threaded_Fork from range of filters
+      * @param filter_arr the list of filters
+      * @param length how many filters
+      */
+      Threaded_Fork(Filter* filter_arr[], size_t length);
+
+      ~Threaded_Fork();
+
+   protected:
+      void set_next(Filter* f[], size_t n);
+      void send(const byte in[], size_t length);
+
+   private:
+      void thread_delegate_work(const byte input[], size_t length);
+      void thread_entry(Filter* filter);
+
+      std::vector<std::shared_ptr<std::thread>> m_threads;
+      std::unique_ptr<struct Threaded_Fork_Data> m_thread_data;
+   };
+
 }
 
 #endif
============================================================
--- src/filters/filter.h	e18d8b9397a9f429e917aa5baf46689b53a85291
+++ src/filters/filter.h	cefd989049b1dd6593a17aeeaeb8cda2e680a209
@@ -1,6 +1,7 @@
 /*
 * Filter
 * (C) 1999-2007 Jack Lloyd
+* (C) 2013 Joel Low
 *
 * Distributed under the terms of the Botan license
 */
@@ -56,7 +57,7 @@ class BOTAN_DLL Filter
       * @param in some input for the filter
       * @param length the length of in
       */
-      void send(const byte in[], size_t length);
+      virtual void send(const byte in[], size_t length);
 
       /**
       * @param in some input for the filter
@@ -161,6 +162,12 @@ class BOTAN_DLL Fanout_Filter : public F
       void set_next(Filter* f[], size_t n) { Filter::set_next(f, n); }
 
       void attach(Filter* f) { Filter::attach(f); }
+
+   private:
+      friend class Threaded_Fork;
+      using Filter::write_queue;
+      using Filter::total_ports;
+      using Filter::next;
    };
 
 /**
============================================================
--- src/filters/info.txt	9380a633685d912ca4cba11b61fcb32ebe7ea060
+++ src/filters/info.txt	b37d3de5ee7ac7914181c201f1186997c52fa67c
@@ -12,6 +12,7 @@
 pipe_io.cpp
 pipe_rw.cpp
 secqueue.cpp
+threaded_fork.cpp
 </source>
 
 <header:public>
============================================================
--- /dev/null	
+++ src/filters/threaded_fork.cpp	f136c3c453f45ad5e95148146694e7709e990b59
@@ -0,0 +1,157 @@
+/*
+* Threaded Fork
+* (C) 2013 Joel Low
+*
+* Distributed under the terms of the Botan license
+*/
+
+#include <botan/basefilt.h>
+#include <botan/internal/semaphore.h>
+#include <atomic>
+
+namespace Botan {
+
+struct Threaded_Fork_Data
+   {
+   public:
+      Threaded_Fork_Data() : m_input_ready_semaphore(0),
+                             m_input_complete_semaphore(0),
+                             m_quit_condition(false)
+      {
+      }
+
+   /*
+   * Semaphore for indicating that there is work to be done (or to
+   * quit)
+   */
+   Semaphore m_input_ready_semaphore;
+
+   /*
+   * The work that needs to be done. This should be only when the threads
+   * are NOT running (i.e. before notifying the work condition, after
+   * the input_complete_semaphore is completely reset.)
+   */
+   const byte* m_input;
+
+   /*
+   * The length of the work that needs to be done.
+   */
+   size_t m_input_length;
+
+   /*
+   * Ensures that all threads have completed processing data.
+   */
+   Semaphore m_input_complete_semaphore;
+
+   /*
+   * Signals the that all worker threads should quit.
+   */
+   std::atomic<bool> m_quit_condition;
+   };
+
+/*
+* Threaded_Fork constructor
+*/
+Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) :
+   Fork(nullptr, static_cast<size_t>(0)),
+   m_thread_data(new Threaded_Fork_Data)
+   {
+   Filter* filters[4] = { f1, f2, f3, f4 };
+   set_next(filters, 4);
+   }
+
+/*
+* Threaded_Fork constructor
+*/
+Threaded_Fork::Threaded_Fork(Filter* filters[], size_t count) :
+   Fork(nullptr, static_cast<size_t>(0)),
+   m_thread_data(new Threaded_Fork_Data)
+   {
+   set_next(filters, count);
+   }
+
+Threaded_Fork::~Threaded_Fork()
+   {
+   m_thread_data->m_quit_condition = true;
+
+   m_thread_data->m_input_ready_semaphore.signal(m_threads.size());
+
+   for(auto& thread : m_threads)
+     thread->join();
+   }
+
+std::string Threaded_Fork::name() const
+   {
+   return "Threaded Fork";
+   }
+
+void Threaded_Fork::set_next(Filter* f[], size_t n)
+   {
+   Fork::set_next(f, n);
+   n = next.size();
+
+   if(n < m_threads.size())
+      m_threads.resize(n);
+   else
+      {
+      m_threads.reserve(n);
+      for(size_t i = m_threads.size(); i != n; ++i)
+         {
+         m_threads.push_back(
+            std::shared_ptr<std::thread>(
+               new std::thread(
+                  std::bind(&Threaded_Fork::thread_entry, this, next[i]))));
+         }
+      }
+   }
+
+void Threaded_Fork::send(const byte input[], size_t length)
+   {
+   if(write_queue.size())
+      thread_delegate_work(&write_queue[0], write_queue.size());
+   thread_delegate_work(input, length);
+
+   bool nothing_attached = true;
+   for(size_t j = 0; j != total_ports(); ++j)
+      if(next[j])
+         nothing_attached = false;
+
+   if(nothing_attached)
+      write_queue += std::make_pair(input, length);
+   else
+      write_queue.clear();
+   }
+
+void Threaded_Fork::thread_delegate_work(const byte input[], size_t length)
+   {
+   //Set the data to do.
+   m_thread_data->m_input = input;
+   m_thread_data->m_input_length = length;
+
+   //Let the workers start processing.
+   m_thread_data->m_input_ready_semaphore.signal(total_ports());
+
+   //Wait for all the filters to finish processing.
+   for(size_t i = 0; i != total_ports(); ++i)
+      m_thread_data->m_input_complete_semaphore.wait();
+
+   //Reset the thread data
+   m_thread_data->m_input = nullptr;
+   m_thread_data->m_input_length = 0;
+   }
+
+void Threaded_Fork::thread_entry(Filter* filter)
+   {
+   while(true)
+      {
+      m_thread_data->m_input_ready_semaphore.wait();
+
+      if(m_thread_data->m_quit_condition)
+         break;
+
+      filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
+      m_thread_data->m_input_complete_semaphore.signal();
+      }
+   }
+
+}
============================================================
--- src/utils/info.txt	1fd0bdc49e01581b411c3aeafe3776b95f17aebf
+++ src/utils/info.txt	c6456e7c7d0ac32668524e07d116b525e501255e
@@ -8,6 +8,7 @@
 charset.cpp
 cpuid.cpp
 parsing.cpp
+semaphore.cpp
 version.cpp
 zero_mem.cpp
 </source>
@@ -17,6 +18,7 @@
 bit_ops.h
 prefetch.h
 rounding.h
+semaphore.h
 stl_util.h
 xor_buf.h
 </header:internal>
@@ -27,13 +29,14 @@
 charset.h
 cpuid.h
 exceptn.h
+get_byte.h
 loadstor.h
 mem_ops.h
 parsing.h
 rotate.h
+semaphore.h
 types.h
 version.h
-get_byte.h
 </header:public>
 
 <libs>
============================================================
--- /dev/null	
+++ src/utils/semaphore.cpp	5b0bf2cd17ef35bc799b1f255d4883f52ae5aefa
@@ -0,0 +1,55 @@
+/*
+* Semaphore
+* by Pierre Gaston (http://p9as.blogspot.com/2012/06/c11-semaphores.html)
+* modified by Joel Low for Botan
+*
+*/
+
+#include <botan/internal/semaphore.h>
+
+namespace Botan {
+
+void Semaphore::signal(size_t n)
+   {
+#if 1
+   for(size_t i = 0; i != n; ++i)
+      this->signal();
+#else
+
+   std::lock_guard<std::mutex> lock(m_mutex);
+   m_value += n;
+
+
+
+   if(m_value <= 0)
+      {
+      ++m_wakeups;
+      m_cond.notify_one();
+      }
+
+#endif
+   }
+
+void Semaphore::signal()
+   {
+   std::lock_guard<std::mutex> lock(m_mutex);
+   ++m_value;
+   if(m_value <= 0)
+      {
+      ++m_wakeups;
+      m_cond.notify_one();
+      }
+   }
+
+void Semaphore::wait()
+   {
+   std::unique_lock<std::mutex> lock(m_mutex);
+   --m_value;
+   if(m_value < 0)
+      {
+      m_cond.wait(lock, [this] { return m_wakeups > 0; });
+      --m_wakeups;
+      }
+   }
+
+}
============================================================
--- /dev/null	
+++ src/utils/semaphore.h	f3ed44a6b43c9401aa262e3b9c02ae359c32e24c
@@ -0,0 +1,35 @@
+/*
+* Semaphore
+* by Pierre Gaston (http://p9as.blogspot.com/2012/06/c11-semaphores.html)
+* modified by Joel Low for Botan
+*
+*/
+
+#ifndef BOTAN_SEMAPHORE_H__
+#define BOTAN_SEMAPHORE_H__
+
+#include <mutex>
+#include <condition_variable>
+
+namespace Botan {
+
+class Semaphore
+   {
+   public:
+      Semaphore(int value) : m_value(value), m_wakeups(0) {}
+
+      void wait();
+      void signal();
+
+      void signal(size_t n);
+
+   private:
+      int m_value;
+      int m_wakeups;
+      std::mutex m_mutex;
+      std::condition_variable m_cond;
+   };
+
+}
+
+#endif


More information about the botan-devel mailing list