[erlang-gen_leader] Initial commit (see review request in rhbz #638906)

Peter Lemenkov peter at fedoraproject.org
Mon Oct 4 19:19:21 UTC 2010


commit d6ee90a5b54c7ec7f5a956e272a806d602963f4d
Author: Peter Lemenkov <lemenkov at gmail.com>
Date:   Mon Oct 4 23:19:18 2010 +0400

    Initial commit (see review request in rhbz #638906)
    
    Signed-off-by: Peter Lemenkov <lemenkov at gmail.com>

 README.markdown        |   25 +
 erlang-gen_leader.spec |   61 +++
 gen_leader.erl         | 1323 ++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 1409 insertions(+), 0 deletions(-)
---
diff --git a/README.markdown b/README.markdown
new file mode 100644
index 0000000..b5a5e33
--- /dev/null
+++ b/README.markdown
@@ -0,0 +1,25 @@
+Mission
+=======
+
+This is a project to revive and modernize the [gen_leader library](http://www.cs.chalmers.se/~hanssv/leader_election/doc/gen_leader.html).
+Numerous versions of this project exist, developed by disparate groups with different aims. By collecting and integrating these we hope
+to provide a new standard-library-quality module for the Erlang runtime that provides leader-election functionality without many of the
+difficulties traditionally associated with such.
+
+Which Version Should I Use?
+===========================
+
+Use the version in combined_version.
+
+What Exactly Does It Do?
+========================
+
+Let us get back to you on that.
+
+Current Participants
+====================
+
++ Andrew Thompson (andrew at hijacked.us)
++ Dave Fayram (dfayram at gmail.com)
++ Hans Svensson (hanssv at chalmers.se)
++ Ulf Wiger (ulf at wiger.net)
\ No newline at end of file
diff --git a/erlang-gen_leader.spec b/erlang-gen_leader.spec
new file mode 100644
index 0000000..1ac22f9
--- /dev/null
+++ b/erlang-gen_leader.spec
@@ -0,0 +1,61 @@
+%global realname gen_leader
+%global debug_package %{nil}
+
+
+Name:		erlang-%{realname}
+Version:	0
+Release:	0.2%{?dist}
+Summary:	A leader election behavior modeled after gen_server
+Group:		Development/Libraries
+License:	ERPL
+URL:		http://github.com/KirinDave/gen_leader_revival
+Source0:	http://github.com/KirinDave/gen_leader_revival/raw/master/combined_version/gen_leader.erl
+Source1:	http://github.com/KirinDave/gen_leader_revival/raw/master//README.markdown
+BuildRoot:	%(mktemp -ud %{_tmppath}/%{name}-%{version}-%{release}-XXXXXX)
+BuildRequires:	erlang-erts
+Requires:	erlang-erts >= R12B-5
+Requires:	erlang-kernel >= R12B-5
+Requires:	erlang-stdlib >= R12B-5
+
+%description
+This application implements a leader election behavior modeled after gen_server.
+This behavior intends to make it reasonably straightforward to implement a fully
+distributed server with master-slave semantics.
+
+
+%prep
+%setup -q -T -c -n %{realname}-%{version}
+mkdir ebin
+cp %{SOURCE0} .
+cp %{SOURCE1} README
+
+
+%build
+erlc +debug_info -o ebin %{realname}.erl
+
+
+%install
+rm -rf $RPM_BUILD_ROOT
+install -D -m 644 ebin/%{realname}.beam $RPM_BUILD_ROOT%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/%{realname}.beam
+
+
+%clean
+rm -rf $RPM_BUILD_ROOT
+
+
+%files
+%defattr(-,root,root,-)
+%doc README
+%dir %{_libdir}/erlang/lib/%{realname}-%{version}
+%dir %{_libdir}/erlang/lib/%{realname}-%{version}/ebin
+%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/%{realname}.beam
+
+
+%changelog
+* Mon Oct  4 2010 Peter Lemenkov <lemenkov at gmail.com> - 0-0.2
+- Added README file to suppress one rpmlint warning
+- Ensured that package is built with debig info
+
+* Thu Sep 30 2010 Peter Lemenkov <lemenkov at gmail.com> - 0-0.1
+- Initial package
+
diff --git a/gen_leader.erl b/gen_leader.erl
new file mode 100644
index 0000000..8d67da9
--- /dev/null
+++ b/gen_leader.erl
@@ -0,0 +1,1323 @@
+%%% ``The contents of this file are subject to the Erlang Public License,
+%%% Version 1.1, (the "License"); you may not use this file except in
+%%% compliance with the License. You should have received a copy of the
+%%% Erlang Public License along with this software. If not, it can be
+%%% retrieved via the world wide web at http://www.erlang.org/.
+%%%
+%%% Software distributed under the License is distributed on an "AS IS"
+%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%%% the License for the specific language governing rights and limitations
+%%% under the License.
+%%%
+%%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+%%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+%%% AB. All Rights Reserved.''
+%%%
+%%%
+%%%     $Id: gen_leader.erl,v 1.4 2008/09/19 07:40:15 hanssv Exp $
+%%%
+%%% @author Hans Svensson <hanssv at chalmers.se>
+%%% @author Thomas Arts <thomas.arts at ituniv.se>
+%%% @author Ulf Wiger <ulf.wiger at ericsson.com>
+%%% @author (contributor: Serge Aleynikov <saleyn at gmail.com>)
+%%%
+%%% @doc Leader election behavior.
+%%% <p>This application implements a leader election behavior modeled after
+%%% gen_server. This behavior intends to make it reasonably
+%%% straightforward to implement a fully distributed server with
+%%% master-slave semantics.</p>
+%%% <p>The gen_leader behavior supports nearly everything that gen_server
+%%% does (some functions, such as multicall() and the internal timeout,
+%%% have been removed), and adds a few callbacks and API functions to
+%%% support leader election etc.</p>
+%%% <p>Also included is an example program, a global dictionary, based
+%%% on the modules gen_leader and dict. The callback implementing the
+%%% global dictionary is called 'test_cb', for no particularly logical
+%%% reason.</p>
+%%% <p><b>New version:</b> The internal leader election algorithm was faulty
+%%% and has been replaced with a new version based on a different leader
+%%% election algorithm. As a consequence of this the query functions
+%%% <tt>alive</tt> and <tt>down</tt> can no longer be provided.
+%%% The new algorithm also make use of an incarnation parameter, by
+%%% default written to disk in the function <tt>incarnation</tt>. This
+%%% implies that only one <tt>gen_leader</tt> per node is permitted, if
+%%% used in a diskless environment, <tt>incarnation</tt> must be adapted.
+%%% </p>
+%%% <p>
+%%% Modifications contributed by Serge Aleynikov:
+%%% <ol>
+%%% <li>Added configurable startup options (see leader_options() type)</li>
+%%% <li>Implemented handle_DOWN/3 callback with propagation of the 
+%%%     leader's state via broadcast to all connected candidates.</li>
+%%% <li>Fixed population of the #election.down member so that down/1 query
+%%%     can be used in the behavior's implementation</li>
+%%% <li>Rewrote implementation of the tau timer to prevent the leader 
+%%%     looping on the timer timeout event when all candidates are connected.</li>
+%%% </ol>
+%%% </p>
+%%% @end
+%%%
+%%% @type leader_options() = [OptArg]
+%%%          OptArg = {workers,   Workers::[node()]}    |
+%%%                   {vardir,    Dir::string()}        |
+%%%                   {bcast_type,Type::bcast_type()}   |
+%%%                   {heartbeat, Seconds::integer()}
+%%% @type bcast_type() = all | sender.
+%%%         Notification control of candidate membership changes. `all' 
+%%%         means that returns from the handle_DOWN/3 and elected/3 leader's events 
+%%%         will be broadcast to all candidates. 
+%%% @type election() = tuple(). Opaque state of the gen_leader behaviour.
+%%% @type node() = atom(). A node name.
+%%% @type name() = atom(). A locally registered name.
+%%% @type serverRef() = Name | {name(),node()} | {global,Name} | pid().
+%%%   See gen_server.
+%%% @type callerRef() = {pid(), reference()}. See gen_server.
+%%%
+-module(gen_leader).
+
+%% Time between rounds of query from the leader
+-define(TAU,5000).
+
+-export([start/6,
+         start_link/6,
+         leader_call/2, leader_call/3, leader_cast/2,
+         call/2, call/3, cast/2,
+         reply/2]).
+
+%% Query functions
+-export([alive/1,
+         down/1,
+         candidates/1,
+         workers/1,
+         broadcast/3,
+         leader_node/1]).
+
+-export([system_continue/3,
+         system_terminate/4,
+         system_code_change/4,
+         format_status/2,
+         worker_announce/2
+        ]).
+
+-export([behaviour_info/1]).
+
+%% Internal exports
+-export([init_it/6, 
+         print_event/3
+         %%, safe_send/2
+        ]).
+
+-import(error_logger , [format/2]).
+
+-import(lists, [foldl/3,
+                foreach/2,
+                member/2,
+                keydelete/3,
+                keysearch/3]).
+
+-record(election, {
+          leader = none,
+          name,
+          leadernode = none,
+          candidate_nodes = [],
+          worker_nodes = [],
+          alive = [],
+          down = [],
+          monitored = [],
+          buffered = [],
+          status,
+          elid,
+          acks = [],
+          work_down = [],
+          cand_timer_int,
+          cand_timer,
+          pendack,
+          incarn,
+          nextel,
+          bcast_type              %% all | one. When `all' each election event
+          %% will be broadcast to all candidate nodes.
+         }).
+
+-record(server, {
+          parent,
+          mod,
+          state,
+          debug
+         }).
+
+%%% ---------------------------------------------------
+%%% Interface functions.
+%%% ---------------------------------------------------
+
+%% @hidden
+behaviour_info(callbacks) ->
+    [{init,1},
+     {elected,3},
+     {surrendered,3},
+     {handle_leader_call,4},
+     {handle_leader_cast,3},
+     {from_leader,3},
+     {handle_call,4},
+     {handle_cast,3},
+     {handle_DOWN,3},
+     {handle_info,2},
+     {terminate,2},
+     {code_change,4}];
+behaviour_info(_Other) ->
+    undefined.
+
+%% @spec (Name::node(), CandidateNodes::[node()],
+%%             OptArgs::leader_options(), Mod::atom(), Arg, Options::list()) ->
+%%          {ok,pid()}
+%%
+%% @doc Starts a gen_leader process without linking to the parent.
+%% @see start_link/6
+start(Name, CandidateNodes, OptArgs, Mod, Arg, Options)
+  when is_atom(Name), is_list(CandidateNodes), is_list(OptArgs) ->
+    gen:start(?MODULE, nolink, {local,Name},
+              Mod, {CandidateNodes, OptArgs, Arg}, Options).
+
+%% @spec (Name::node(), CandidateNodes::[node()],
+%%             OptArgs::leader_options(), Mod::atom(), Arg, Options::list()) ->
+%%          {ok,pid()}
+%%
+%% @doc Starts a gen_leader process.
+%% <table>
+%%  <tr><td>Name</td><td>The locally registered name of the process</td></tr>
+%%  <tr><td>CandidateNodes</td><td>The names of nodes capable of assuming
+%%     a leadership role</td></tr>
+%%  <tr><td valign="top">OptArgs</td>
+%%      <td>Optional arguments given to `gen_leader'.
+%%          <du>
+%%          <dl>{workers, Workers}</dl>
+%%          <dd>The names of nodes that will be part of the "cluster",
+%%              but cannot ever assume a leadership role. Default: [].</dd>
+%%          <dl>{vardir, Dir}</dl>
+%%          <dd>Directory name used to store candidate's incarnation cookie.
+%%              Default: "."</dd>
+%%          <dl>{bcast_type, Type}</dl>
+%%          <dd>When `Type' is 'all' each election event (when a new
+%%              candidate becomes visible to the leader) will be broadcast
+%%              to all live candidate nodes.  Each candidate will get
+%%              a from_leader/3 callback. When `Type' is `sender', only
+%%              the newly registered candidate will get the surrendered/3
+%%              callback. Default: `sender'.</dd>
+%%          <dl>{heartbeat, Seconds}</dl>
+%%          <dd>Heartbeat timeout value used to send ping messages to inactive
+%%              candidate nodes.</dd>
+%%          </du>
+%%      </td></tr>
+%%  <tr><td>Mod</td><td>The name of the callback module</td></tr>
+%%  <tr><td>Arg</td><td>Argument passed on to <code>Mod:init/1</code></td></tr>
+%%  <tr><td>Options</td><td>Same as gen_server's Options</td></tr>
+%% </table>
+%%
+%% <p>The list of candidates needs to be known from the start. Workers
+%% could potentially be added at runtime, but no functionality to do
+%% this is provided by this version.</p>
+%% @end
+start_link(Name, CandidateNodes, OptArgs, Mod, Arg, Options)
+  when is_atom(Name), is_list(CandidateNodes), is_list(OptArgs) ->
+    gen:start(?MODULE, link, {local,Name},
+              Mod, {CandidateNodes, OptArgs, Arg}, Options).
+
+%% Query functions to be used from the callback module
+
+alive(#election{alive = Alive}) ->
+    Alive.
+
+down(#election{down = Down}) ->
+    Down.
+
+%% @spec (E::election()) -> node() | none
+%% @doc Returns the current leader node.
+%%
+leader_node(#election{leadernode=Leader}) ->
+    Leader.
+
+%% @spec candidates(E::election()) -> [node()]
+%% @doc Returns a list of known candidates.
+%%
+candidates(#election{candidate_nodes = Cands}) ->
+    Cands.
+
+%% @spec workers(E::election()) -> [node()]
+%% @doc Returns a list of known workers.
+%%
+workers(#election{worker_nodes = Workers}) ->
+    Workers.
+
+%% Used by dynamically added workers.
+%% @hidden
+worker_announce(Name, Pid) ->
+  Name ! {add_worker, Pid},
+  Name ! {heartbeat, Pid}.
+
+%%
+                                                % Make a call to a generic server.
+%% If the server is located at another node, that node will
+%% be monitored.
+%% If the client is trapping exits and is linked server termination
+%% is handled here (? Shall we do that here (or rely on timeouts) ?).
+%%
+%% @spec call(Name::serverRef(), Request) -> term()
+%%
+%% @doc Equivalent to <code>gen_server:call/2</code>, but with a slightly
+%% different exit reason if something goes wrong. This function calls
+%% the <code>gen_leader</code> process exactly as if it were a gen_server
+%% (which, for practical purposes, it is.)
+%% @end
+call(Name, Request) ->
+    case catch gen:call(Name, '$gen_call', Request) of
+        {ok,Res} ->
+            Res;
+        {'EXIT',Reason} ->
+            exit({Reason, {?MODULE, local_call, [Name, Request]}})
+    end.
+
+%% @spec call(Name::serverRef(), Request, Timeout::integer()) ->
+%%     Reply
+%%
+%%     Reply = term()
+%%
+%% @doc Equivalent to <code>gen_server:call/3</code>, but with a slightly
+%% different exit reason if something goes wrong. This function calls
+%% the <code>gen_leader</code> process exactly as if it were a gen_server
+%% (which, for practical purposes, it is.)
+%% @end
+call(Name, Request, Timeout) ->
+    case catch gen:call(Name, '$gen_call', Request, Timeout) of
+        {ok,Res} ->
+            Res;
+        {'EXIT',Reason} ->
+            exit({Reason, {?MODULE, local_call, [Name, Request, Timeout]}})
+    end.
+
+%% @spec leader_call(Name::name(), Request::term())
+%%    -> Reply
+%%
+%%    Reply = term()
+%%
+%% @doc Makes a call (similar to <code>gen_server:call/2</code>) to the
+%% leader. The call is forwarded via the local gen_leader instance, if
+%% that one isn't actually the leader. The client will exit if the
+%% leader dies while the request is outstanding.
+%% <p>This function uses <code>gen:call/3</code>, and is subject to the
+%% same default timeout as e.g. <code>gen_server:call/2</code>.</p>
+%% @end
+%%
+leader_call(Name, Request) ->
+    case catch gen:call(Name, '$leader_call', Request) of
+        {ok,{leader,reply,Res}} ->
+            Res;
+        {ok,{error, leader_died}} ->
+            exit({leader_died, {?MODULE, leader_call, [Name, Request]}});
+        {'EXIT',Reason} ->
+            exit({Reason, {?MODULE, leader_call, [Name, Request]}})
+    end.
+
+%% @spec leader_call(Name::name(), Request::term(), Timeout::integer())
+%%    -> Reply
+%%
+%%    Reply = term()
+%%
+%% @doc Makes a call (similar to <code>gen_server:call/3</code>) to the
+%% leader. The call is forwarded via the local gen_leader instance, if
+%% that one isn't actually the leader. The client will exit if the
+%% leader dies while the request is outstanding.
+%% @end
+%%
+leader_call(Name, Request, Timeout) ->
+    case catch gen:call(Name, '$leader_call', Request, Timeout) of
+        {ok,{leader,reply,Res}} ->
+            Res;
+        {'EXIT',Reason} ->
+            exit({Reason, {?MODULE, leader_call, [Name, Request, Timeout]}})
+    end.
+
+
+%% @equiv gen_server:cast/2
+cast(Name, Request) ->
+    catch do_cast('$gen_cast', Name, Request),
+    ok.
+
+%% @spec leader_cast(Name::name(), Msg::term()) -> ok
+%% @doc Similar to <code>gen_server:cast/2</code> but will be forwarded to
+%% the leader via the local gen_leader instance.
+leader_cast(Name, Request) ->
+    catch do_cast('$leader_cast', Name, Request),
+    ok.
+
+
+do_cast(Tag, Name, Request) when is_atom(Name) ->
+    Name ! {Tag, Request};
+do_cast(Tag, Pid, Request) when is_pid(Pid) ->
+    Pid ! {Tag, Request}.
+
+
+%% @spec reply(From::callerRef(), Reply::term()) -> Void
+%% @equiv gen_server:reply/2
+reply({To, Tag}, Reply) ->
+    catch To ! {Tag, Reply}.
+
+
+%%% ---------------------------------------------------
+%%% Initiate the new process.
+%%% Register the name using the Rfunc function
+%%% Calls the Mod:init/Args function.
+%%% Finally an acknowledge is sent to Parent and the main
+%%% loop is entered.
+%%% ---------------------------------------------------
+%%% @hidden
+init_it(Starter, Parent, {local, Name}, Mod, {CandidateNodes, Workers, Arg}, Options) ->
+    %% R13B passes {local, Name} instead of just Name
+    init_it(Starter, Parent, Name, Mod,
+            {CandidateNodes, Workers, Arg}, Options);
+init_it(Starter, self, Name, Mod, {CandidateNodes, OptArgs, Arg}, Options) ->
+    init_it(Starter, self(), Name, Mod,
+            {CandidateNodes, OptArgs, Arg}, Options);
+init_it(Starter,Parent,Name,Mod,{CandidateNodes,OptArgs,Arg},Options) ->
+    Workers     = proplists:get_value(workers,   OptArgs, []),
+    VarDir      = proplists:get_value(vardir,    OptArgs, "."),
+    Interval    = proplists:get_value(heartbeat, OptArgs, ?TAU div 1000) * 1000,
+    BcastType   = proplists:get_value(bcast_type,OptArgs, sender),
+    Debug       = debug_options(Name, Options),
+    AmCandidate = member(node(), CandidateNodes),
+
+    Election    = #election{
+      candidate_nodes = CandidateNodes,
+      worker_nodes    = Workers,
+      name            = Name,
+      nextel          = 0,
+      cand_timer_int  = Interval,
+      bcast_type      = BcastType
+     },
+
+    case {AmCandidate, member(node(), Workers)} of
+        {false, false} ->
+            %% I am neither a candidate nor a worker - don't start this process
+            error_logger:warning_msg("~w not started - node is not a candidate/worker\n", [Name]),
+            proc_lib:init_ack(Starter, ignore),
+            exit(normal);
+        _ ->
+            ok
+    end,
+
+    case {catch Mod:init(Arg), AmCandidate} of
+        {{stop, Reason},_} ->
+            proc_lib:init_ack(Starter, {error, Reason}),
+            exit(Reason);
+        {ignore,_} ->
+            proc_lib:init_ack(Starter, ignore),
+            exit(normal);
+        {{'EXIT', Reason},_} ->
+            proc_lib:init_ack(Starter, {error, Reason}),
+            exit(Reason);
+        {{ok, State}, true} ->
+            NewE = startStage1(Election#election{incarn = incarnation(VarDir, Name, node())}),
+            proc_lib:init_ack(Starter, {ok, self()}),
+
+            % handle the case where there's only one candidate worker and we can't
+            % rely on DOWN messages to trigger the elected() call because we never get
+            % a DOWN for ourselves
+            case CandidateNodes =:= [node()] of
+                true ->
+                    % there's only one candidate leader; us
+                    hasBecomeLeader(NewE,#server{parent = Parent,mod = Mod,
+                        state = State,debug = Debug},{init});
+                false ->
+                    % more than one candidate worker, continue as normal
+                    safe_loop(#server{parent = Parent,mod = Mod,
+                              state = State,debug = Debug},
+                              candidate, NewE,{init})
+            end;
+        {{ok, State}, false} ->
+            proc_lib:init_ack(Starter, {ok, self()}),
+            case lists:member(self(), Workers) of 
+              false ->
+                rpc:multicall(CandidateNodes, gen_leader, 
+                              worker_announce, [Name, node(self())]);
+              _ -> nop
+            end,
+            safe_loop(#server{parent = Parent,mod = Mod,
+                              state = State,debug = Debug},
+                      waiting_worker, Election,{init});
+        {Else,_} ->
+            Error = {bad_return_value, Else},
+            proc_lib:init_ack(Starter, {error, Error}),
+            exit(Error)
+    end.
+
+
+%%% ---------------------------------------------------
+%%% The MAIN loops.
+%%% ---------------------------------------------------
+
+safe_loop(#server{mod = Mod, state = State} = Server, Role,
+          #election{name = Name} = E, _PrevMsg) ->
+    %% Event for QuickCheck
+    %% ?EVENT({Role,E}),
+  receive
+    {system, From, Req} ->
+      #server{parent = Parent, debug = Debug} = Server,
+      sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
+                            [safe, Server, Role, E]);
+    {'EXIT', _, Reason} = Msg ->
+      terminate(Reason, Msg, Server, Role, E);
+    {halt,T,From} = Msg ->
+      NewE = halting(E,T,From),
+      From ! {ackLeader,T,self()},
+      safe_loop(Server,Role,NewE,Msg);
+    {hasLeader,Ldr,T,_} = Msg ->
+      NewE1 = mon_node(E,Ldr),
+      case ( (E#election.status == elec2) and (E#election.acks /= []) ) of
+        true ->
+          lists:foreach(
+            fun(Node) ->
+                {Name,Node} ! {hasLeader,Ldr,T,self()}
+            end,E#election.acks);
+        false ->
+          ok
+      end,
+      NewE = NewE1#election{elid = T,
+                            status = wait,
+                            leadernode = node(Ldr),
+                            down = E#election.down -- [node(Ldr)],
+                            acks = []},
+      Ldr ! {isLeader,T,self()},
+      safe_loop(Server,Role,NewE,Msg);
+    {isLeader,T,From} = Msg ->
+      From ! {notLeader,T,self()},
+      safe_loop(Server,Role,E,Msg);
+    {notLeader,T,_} = Msg ->
+      case ( (E#election.status == wait) and (E#election.elid == T) ) of
+        true ->
+          NewE = startStage1(E);
+        false ->
+          NewE = E
+      end,
+      safe_loop(Server,Role,NewE,Msg);
+    {ackLeader,T,From} = Msg ->
+      case ( (E#election.status == elec2) and (E#election.elid == T) and
+             (E#election.pendack == node(From)) ) of
+        true ->
+          NewE = continStage2(
+                   E#election{acks = [node(From)|E#election.acks]});
+        false ->
+          NewE = E
+      end,
+      hasBecomeLeader(NewE,Server,Msg);
+    {ldr,Synch,T,Workers, From} = Msg ->
+      case ( (E#election.status == wait) and (E#election.elid == T) ) of
+        true ->
+          timer:cancel(E#election.cand_timer),
+          NewE1 = mon_node(E, From),
+          NewE = NewE1#election{leader = From,
+                                leadernode = node(From),
+                                worker_nodes = Workers,
+                                status = norm,
+                                cand_timer=undefined},
+                                                %io:format("Making ~p (myself) surrender. Workers should be passed as: ~p~n~n",
+                                                %          [self(), NewE]),
+                                                %io:format("~n~n~n ---Gen leader is forcing this instance to surrender.---~n~n~n"),
+          {ok,NewState} = Mod:surrendered(State,Synch,NewE),
+          loop(Server#server{state = NewState},surrendered,NewE,Msg);
+        false ->
+          safe_loop(Server,Role,E,Msg)
+      end;
+    {normQ,T,From} = Msg ->
+      case ( (E#election.status == elec1) or
+             ( (E#election.status == wait) and (E#election.elid == T))) of
+        true ->
+          NewE = halting(E,T,From),
+          From ! {notNorm,T,self()};
+        false ->
+          NewE = E
+      end,
+      safe_loop(Server,Role,NewE,Msg);
+
+    {notNorm,_,_} = Msg ->
+      safe_loop(Server,Role,E,Msg);
+    {workerAlive,T,From} = Msg ->
+      case E#election.leadernode == none of
+        true ->
+          %% We should initiate activation,
+          %% monitor the possible leader!
+          NewE = mon_node(E#election{leadernode = node(From), elid = T},
+                          From),
+          From ! {workerIsAlive,T,self()};
+        false ->
+          %% We should acutally ignore this, the present activation
+          %% will complete or abort first...
+          NewE = E
+      end,
+      safe_loop(Server,Role,NewE,Msg);
+    {workerIsAlive,_,_} = Msg ->
+      %% If this happens, the activation process should abort
+      %% This process is no longer the leader!
+      %% The sender will notice this via a DOWN message
+      safe_loop(Server,Role,E,Msg);
+    {activateWorker,T,Synch,From} = Msg ->
+      case ( (T == E#election.elid) and
+             (node(From) == E#election.leadernode)) of
+        true ->
+          NewE = E#election{ leader = From, status = worker },
+                                                %io:format("~n~n~n ---Gen leader is forcing this instance to surrender.---~n~n~n"),
+          {ok,NewState} = Mod:surrendered(State,Synch,NewE),
+          loop(Server#server{state = NewState},worker,NewE,Msg);
+        false ->
+          %% This should be a VERY special case...
+          %% But doing nothing is the right thing!
+          %% A DOWN message should arrive to solve this situation
+          safe_loop(Server,Role,E,Msg)
+      end;
+
+    {heartbeat, _Node} = Msg ->
+      %% io:format("Got {heartbeat, ~w} - ~w (safe_loop)\n", [_Node, E#election.leadernode]),
+      safe_loop(Server,Role,E,Msg);
+    {candidate_timer} = Msg ->
+      NewE =
+        case E#election.down of
+          [] ->
+            %% io:format("Canceling candidate timer (timer=~w\n  down=~w\n  role=~w, leader=~w)\n",
+            %%    [E#election.cand_timer, E#election.down, Role, E#election.leadernode]),
+            timer:cancel(E#election.cand_timer),
+            E#election{cand_timer = undefined};
+          Down ->
+            %% Some of potential master candidate nodes are down - try to wake them up
+            F = fun(N) ->
+                    %% io:format("Sending {heartbeat, ~w} to ~w\n", [N, node()]),
+                    {E#election.name, N} ! {heartbeat, node()}
+                end,
+            [F(N) || N <- Down, {ok, up} =/= net_kernel:node_info(N, state)],
+            E
+        end,
+      safe_loop(Server,Role,NewE,Msg);
+    {'DOWN',_Ref,process,From,_Reason} = Msg when Role==waiting_worker ->
+      %% We are only monitoring one proc, the leader!
+      Node = case From of
+               {Name,_Node} -> _Node;
+               _ when is_pid(From) -> node(From)
+             end,
+      case Node == E#election.leadernode of
+        true ->
+          NewE = E#election{ leader = none, leadernode = none,
+                             status = waiting_worker,
+                             monitored = []};
+        false ->
+          NewE = E
+      end,
+      safe_loop(Server, Role, NewE,Msg);
+    {'DOWN',Ref,process,From,_Reason} = Msg ->
+      Node = case From of
+               {Name,_Node} -> _Node;
+               _ when is_pid(From) -> node(From)
+             end,
+      NewMon = E#election.monitored -- [{Ref,Node}],
+      case lists:member(Node,E#election.candidate_nodes) of
+        true ->
+          NewDown = [Node | E#election.down],
+          E1 = E#election{down = NewDown, monitored = NewMon},
+          case ( pos(Node,E#election.candidate_nodes) <
+                 pos(node(),E#election.candidate_nodes) ) of
+            true ->
+              Lesser = lesser(node(),E#election.candidate_nodes),
+              LesserIsSubset = (Lesser -- NewDown) == [],
+              case ((E#election.status == wait) and
+                    (Node == E#election.leadernode)) of
+                true ->
+                  NewE = startStage1(E1);
+                false ->
+                  case ((E#election.status == elec1) and
+                        LesserIsSubset) of
+                    true ->
+                      NewE = startStage2(
+                               E1#election{down = Lesser});
+                    false ->
+                      NewE = E1
+                  end
+              end;
+            false ->
+              case ( (E#election.status == elec2) and
+                     (Node == E#election.pendack) ) of
+                true ->
+                  NewE = continStage2(E1);
+                false ->
+                  case ( (E#election.status == wait) and
+                         (Node == E#election.leadernode)) of
+                    true ->
+                      NewE = startStage1(E1);
+                    false ->
+                      NewE = E1
+                  end
+              end
+          end
+      end,
+      hasBecomeLeader(NewE,Server,Msg)
+    end.
+
+
+loop(#server{parent = Parent,
+             mod = Mod,
+             state = State,
+             debug = Debug} = Server, Role,
+     #election{name = Name} = E, _PrevMsg) ->
+    receive
+        Msg ->
+        %io:format("NORMAL LOOP ELECTION: ~n~p~n~n", [E]),
+            case Msg of
+                {system, From, Req} ->
+                    sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
+                                          [normal, Server, Role, E]);
+                {'EXIT', Parent, Reason} ->
+                    terminate(Reason, Msg, Server, Role, E);
+
+                {halt,_,From} ->
+                    From ! {hasLeader,E#election.leader,E#election.elid,self()},
+                    loop(Server,Role,E,Msg);
+                {hasLeader,_,_,_} ->
+                    loop(Server,Role,E,Msg);
+                {isLeader,T,From} ->
+                    case (self() == E#election.leader) of
+                        true ->
+                            NewE = mon_node(
+                                     E#election{down = E#election.down -- [node(From)]},
+                                     From),
+                            {ok,Synch,NewState} = Mod:elected(State,NewE,node(From)),
+                            From ! {ldr,Synch,E#election.elid,E#election.worker_nodes, self()},
+                            broadcast_candidates(NewE, Synch, [From]),
+                            loop(Server#server{state = NewState},Role,NewE,Msg);
+                        false ->
+                            From ! {notLeader,T,self()},
+                            loop(Server,Role,E,Msg)
+                    end;
+                {ackLeader,_,_} ->
+                    loop(Server,Role,E,Msg);
+                {notLeader,_,_} ->
+                    loop(Server,Role,E,Msg);
+                {ack,_,_} ->
+                    loop(Server,Role,E,Msg);
+                {ldr,_,_,_} ->
+                    loop(Server,Role,E,Msg);
+                {normQ,_,_} ->
+                    loop(Server,Role,E,Msg);
+                {notNorm,T,From} ->
+                    case ( (E#election.leader == self()) and
+                           (E#election.elid == T) ) of
+                        true ->
+                            NewE = mon_node(
+                                     E#election{down = E#election.down -- [node(From)]},
+                                     From),
+                            {ok,Synch,NewState} = Mod:elected(State,NewE,node(From)),
+                            From ! {ldr,Synch,NewE#election.elid, E#election.worker_nodes,self()},
+                            broadcast_candidates(NewE, Synch, [From]),
+                            loop(Server#server{state = NewState},Role,NewE,Msg);
+                        false ->
+                            loop(Server,Role,E,Msg)
+                    end;
+                {workerAlive,_,_} ->
+                    %% Do nothing if we get this from a new leader
+                    %% We will soon notice that the prev leader has died, and
+                    %%get the same message again when we are back in safe_loop!
+                    loop(Server,Role,E,Msg);
+                {activateWorker,_,_,_} ->
+                    %% We ignore this, we are already active...
+                    %% It must be an old message!
+                    loop(Server,Role,E,Msg);
+                {workerIsAlive,T,From} ->
+                    case ((T == E#election.elid) and (self() == E#election.leader)
+                          %% and iselem(node(From),E#election.monitored)
+                         ) of
+                        true ->
+                            NewE = mon_node(
+                                     E#election{work_down = E#election.work_down -- [node(From)]},
+                                     From),
+                            %% NewE = E#election{work_down = E#election.work_down -- [node(From)]},
+                            {ok,Synch,NewState} = Mod:elected(State,NewE,node(From)),
+                            From ! {activateWorker,T,Synch,self()},
+                            broadcast_candidates(NewE, Synch, [From]),
+                            loop(Server#server{state = NewState},Role,NewE,Msg);
+                        false ->
+                            loop(Server,Role,E,Msg)
+                    end;
+                {heartbeat, _Node} ->
+                    case (E#election.leader == self()) of
+                        true ->
+                            %% io:format("Got {heartbeat, ~w} - ~w (loop)\n", [_Node, self]),
+                            Candidates = E#election.down -- [lists:nth(1,E#election.candidate_nodes)],
+                            lists:foreach(
+                              fun(N) ->
+                                      Elid = E#election.elid,
+                                      %% io:format("Sent hb to ~w\n", [N]),
+                                      {Name,N} ! {normQ,Elid,self()}
+                              end,Candidates),
+                            lists:foreach(
+                              fun(N) ->
+                                      Elid = E#election.elid,
+                                      {Name,N} ! {workerAlive,Elid,self()}
+                              end,E#election.work_down);
+                        %% timer:send_after(E#election.cand_timer_int,{heartbeat})
+                        false ->
+                            %% io:format("Got {heartbeat, ~w} - ~w (loop)\n", [_Node, E#election.leadernode]),
+                            ok
+                    end,
+                    loop(Server,Role,E,Msg);
+                {candidate_timer} = Msg ->
+                    NewE =
+                        if E#election.down =:= [] orelse (Role =/= elected andalso E#election.leadernode =/= none) ->
+                                timer:cancel(E#election.cand_timer),
+                                %% io:format("Canceling candidate timer (timer=~w\n  down=~w\n  role=~w, leader=~w)\n",
+                                %%    [E#election.cand_timer, E#election.down, Role, E#election.leadernode]),
+                                E#election{cand_timer=undefined};
+                           true ->
+                                %% io:format("Candidate timer - ~w (loop) down: ~w\n", [E#election.leadernode, E#election.down]),
+                                E
+                        end,
+                    %% This shouldn't happen in the leader - just ignore
+                    loop(Server,Role,NewE,Msg);
+                {'DOWN',_Ref,process,From,_Reason} when Role == worker ->
+                    %% We are only monitoring one proc, the leader!
+                    Node = case From of
+                               {Name,_Node} -> _Node;
+                               _ when is_pid(From) -> node(From)
+                           end,
+                    case Node == E#election.leadernode of
+                        true ->
+                            NewE = E#election{ leader = none, leadernode = none,
+                                               status = waiting_worker,
+                                               monitored = []},
+                            safe_loop(Server, waiting_worker, NewE,Msg);
+                        false ->
+                            loop(Server, Role, E,Msg)
+                    end;
+                {'DOWN',Ref,process,From,_Reason} ->
+                    Node = case From of
+                               {Name,_Node} -> _Node;
+                               _ when is_pid(From) -> node(From)
+                           end,
+                    NewMon = E#election.monitored -- [{Ref,Node}],
+                    case lists:member(Node,E#election.candidate_nodes) of
+                        true ->
+                            NewDown = [Node | E#election.down],
+                            E1 = E#election{down = NewDown, monitored = NewMon},
+                            case (Node == E#election.leadernode) of
+                                true ->
+                                    NewE = startStage1(E1),
+                                    safe_loop(Server, candidate, NewE,Msg);
+                                false when E#election.leadernode =:= node() ->
+                                    %% Serge: call handle_DOWN
+                                    {NewState, NewE} =
+                                        case (Server#server.mod):handle_DOWN(Node, Server#server.state, E1) of
+                                            {ok, NewState1} ->
+                                                {NewState1, E1};
+                                            {ok, Synch, NewState1} ->
+                                                {NewState1, broadcast({from_leader,Synch}, E1)}
+                                        end,
+                                    loop(Server#server{state=NewState}, Role, NewE, Msg);
+                                false ->
+                                    loop(Server, Role, E1,Msg)
+                            end;
+                        false ->
+                            %% I am the leader,
+                            %% make sure the dead worker is in work_down.
+                            E1 = E#election{
+                                   monitored = NewMon,
+                                   work_down = [Node |
+                                                (E#election.work_down -- [Node])]
+                                  },
+                            loop(Server, Role, E1,Msg)
+                    end;
+              {add_worker, WorkerNode} ->
+                % io:format("Adding worker ~p~n", [WorkerNode]),
+                case lists:member(WorkerNode, E#election.worker_nodes) of
+                  false ->
+                    {WNodes, DNodes} = {E#election.worker_nodes, E#election.work_down},
+                    
+                    loop(Server, Role, E#election{worker_nodes=[WorkerNode|WNodes],
+                                                  work_down=[WorkerNode|DNodes]},
+                         Msg);
+                  true -> % Redundancy, meet the mirror
+                    % io:format("Nothing happens here.~n"),
+                    loop(Server, Role, E, Msg)
+                end;
+              _Msg when Debug == [] ->
+                handle_msg(Msg, Server, Role, E);
+              _Msg ->
+                Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
+                                          E#election.name, {in, Msg}),
+                handle_msg(Msg, Server#server{debug = Debug1}, Role, E)
+            end
+    end.
+
+%%-----------------------------------------------------------------
+%% Callback functions for system messages handling.
+%%-----------------------------------------------------------------
+%% @hidden
+system_continue(_Parent, _Debug, [safe, Server, Role, E]) ->
+    safe_loop(Server, Role, E,{});
+system_continue(_Parent, _Debug, [normal, Server, Role, E]) ->
+    loop(Server, Role, E,{}).
+
+%% @hidden
+system_terminate(Reason, _Parent, _Debug, [_Mode, Server, Role, E]) ->
+    terminate(Reason, [], Server, Role, E).
+
+%% @hidden
+system_code_change([Mode, Server, Role, E], _Module, OldVsn, Extra) ->
+    #server{mod = Mod, state = State} = Server,
+    case catch Mod:code_change(OldVsn, State, E, Extra) of
+        {ok, NewState} ->
+            NewServer = Server#server{state = NewState},
+            {ok, [Mode, NewServer, Role, E]};
+        {ok, NewState, NewE} ->
+            NewServer = Server#server{state = NewState},
+            {ok, [Mode, NewServer, Role, NewE]};
+        Else -> Else
+    end.
+
+%%-----------------------------------------------------------------
+%% Format debug messages.  Print them as the call-back module sees
+%% them, not as the real erlang messages.  Use trace for that.
+%%-----------------------------------------------------------------
+%% @hidden
+print_event(Dev, {in, Msg}, Name) ->
+    case Msg of
+        {'$gen_call', {From, _Tag}, Call} ->
+            io:format(Dev, "*DBG* ~p got local call ~p from ~w~n",
+                      [Name, Call, From]);
+        {'$leader_call', {From, _Tag}, Call} ->
+            io:format(Dev, "*DBG* ~p got global call ~p from ~w~n",
+                      [Name, Call, From]);
+        {'$gen_cast', Cast} ->
+            io:format(Dev, "*DBG* ~p got local cast ~p~n",
+                      [Name, Cast]);
+        {'$leader_cast', Cast} ->
+            io:format(Dev, "*DBG* ~p got global cast ~p~n",
+                      [Name, Cast]);
+        _ ->
+            io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
+    end;
+print_event(Dev, {out, Msg, To, State}, Name) ->
+    io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
+              [Name, Msg, To, State]);
+print_event(Dev, {noreply, State}, Name) ->
+    io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
+print_event(Dev, Event, Name) ->
+    io:format(Dev, "*DBG* ~p dbg  ~p~n", [Name, Event]).
+
+
+handle_msg({'$leader_call', From, Request} = Msg,
+           #server{mod = Mod, state = State} = Server, elected = Role, E) ->
+    case catch Mod:handle_leader_call(Request, From, State, E) of
+        {reply, Reply, NState} ->
+            NewServer = reply(From, {leader,reply,Reply},
+                              Server#server{state = NState}, Role, E),
+            loop(NewServer, Role, E,Msg);
+        {reply, Reply, Broadcast, NState} ->
+            NewE = broadcast({from_leader,Broadcast}, E),
+            NewServer = reply(From, {leader,reply,Reply},
+                              Server#server{state = NState}, Role,
+                              NewE),
+            loop(NewServer, Role, NewE,Msg);
+        {noreply, NState} = Reply ->
+            NewServer = handle_debug(Server#server{state = NState},
+                                     Role, E, Reply),
+            loop(NewServer, Role, E,Msg);
+        {stop, Reason, Reply, NState} ->
+            {'EXIT', R} =
+                (catch terminate(Reason, Msg,
+                                 Server#server{state = NState},
+                                 Role, E)),
+            reply(From, Reply),
+            exit(R);
+        Other ->
+            handle_common_reply(Other, Msg, Server, Role, E)
+    end;
+handle_msg({from_leader, Cmd} = Msg,
+           #server{mod = Mod, state = State} = Server, Role, E) ->
+    NewE = check_candidates(E),
+    handle_common_reply(catch Mod:from_leader(Cmd, State, NewE),
+                        Msg, Server, Role, NewE);
+handle_msg({'$leader_call', From, Request} = Msg, Server, Role,
+           #election{buffered = Buffered, leader = Leader} = E) ->
+    Ref = make_ref(),
+    Leader ! {'$leader_call', {self(),Ref}, Request},
+    NewBuffered = [{Ref,From}|Buffered],
+    loop(Server, Role, E#election{buffered = NewBuffered},Msg);
+handle_msg({Ref, {leader,reply,Reply}} = Msg, Server, Role,
+           #election{buffered = Buffered} = E) ->
+    {value, {_,From}} = keysearch(Ref,1,Buffered),
+    NewServer = reply(From, {leader,reply,Reply}, Server, Role,
+                      E#election{buffered = keydelete(Ref,1,Buffered)}),
+    loop(NewServer, Role, E, Msg);
+handle_msg({'$gen_call', From, Request} = Msg,
+           #server{mod = Mod, state = State} = Server, Role, E) ->
+    case catch Mod:handle_call(Request, From, State, E) of
+        {reply, Reply, NState} ->
+            NewServer = reply(From, Reply,
+                              Server#server{state = NState}, Role, E),
+            loop(NewServer, Role, E, Msg);
+        {noreply, NState} = Reply ->
+            NewServer = handle_debug(Server#server{state = NState},
+                                     Role, E, Reply),
+            loop(NewServer, Role, E, Msg);
+        {stop, Reason, Reply, NState} ->
+            {'EXIT', R} =
+                (catch terminate(Reason, Msg, Server#server{state = NState},
+                                 Role, E)),
+            reply(From, Reply),
+            exit(R);
+        Other ->
+            handle_common_reply(Other, Msg, Server, Role, E)
+    end;
+handle_msg({'$gen_cast',Msg} = Cast,
+           #server{mod = Mod, state = State} = Server, Role, E) ->
+    handle_common_reply(catch Mod:handle_cast(Msg, State, E),
+                        Cast, Server, Role, E);
+handle_msg({'$leader_cast', Msg} = Cast,
+          #server{mod = Mod, state = State} = Server, elected = Role, E) ->
+    case catch Mod:handle_leader_cast(Msg, State, E) of
+        {noreply, NState} ->
+            NewServer = handle_debug(Server#server{state = NState},
+                                     Role, E, Cast),
+            loop(NewServer, Role, E,Cast);
+        Other ->
+            handle_common_reply(Other, Msg, Server, Role, E)
+    end;
+handle_msg({'$leader_cast', Msg} = Cast, Server, Role,
+           #election{leader = Leader} = E) ->
+    Leader ! {'$leader_cast', Msg},
+    loop(Server, Role, E, Cast);
+
+handle_msg(Msg,
+           #server{mod = Mod, state = State} = Server, Role, E) ->
+    handle_common_reply(catch Mod:handle_info(Msg, State),
+                        Msg, Server, Role, E).
+
+
+handle_common_reply(Reply, Msg, Server, Role, E) ->
+    case Reply of
+        {noreply, NState} -> 
+            NewServer = handle_debug(Server#server{state = NState},
+                                     Role, E, Reply),
+            loop(NewServer, Role, E, Msg);
+        {ok, NState} ->
+            NewServer = handle_debug(Server#server{state = NState},
+                                     Role, E, Reply),
+            loop(NewServer, Role, E, Msg);
+        {stop, Reason, NState} ->
+            terminate(Reason, Msg, Server#server{state = NState}, Role, E);
+        {'EXIT', Reason} ->
+            terminate(Reason, Msg, Server, Role, E);
+        _ ->
+            terminate({bad2_return_value, Reply}, Msg, Server, Role, E)
+    end.
+
+
+reply({To, Tag}, Reply, #server{state = State} = Server, Role, E) ->
+    reply({To, Tag}, Reply),
+    handle_debug(Server, Role, E, {out, Reply, To, State}).
+
+
+handle_debug(#server{debug = []} = Server, _Role, _E, _Event) ->
+    Server;
+handle_debug(#server{debug = Debug} = Server, _Role, E, Event) ->
+    Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
+                              E#election.name, Event),
+    Server#server{debug = Debug1}.
+
+%%% ---------------------------------------------------
+%%% Terminate the server.
+%%% ---------------------------------------------------
+
+terminate(Reason, Msg, #server{mod = Mod,
+                               state = State,
+                               debug = Debug} = _Server, _Role,
+          #election{name = Name, cand_timer = Timer} = _E) ->
+    timer:cancel(Timer),
+    case catch Mod:terminate(Reason, State) of
+        {'EXIT', R} ->
+            error_info(R, Name, Msg, State, Debug),
+            exit(R);
+        _ ->
+            case Reason of
+                normal ->
+                    exit(normal);
+                shutdown ->
+                    exit(shutdown);
+                _ ->
+                    error_info(Reason, Name, Msg, State, Debug),
+                    exit(Reason)
+            end
+    end.
+
+%% Maybe we shouldn't do this?  We have the crash report...
+error_info(Reason, Name, Msg, State, Debug) ->
+    format("** Generic leader ~p terminating \n"
+           "** Last message in was ~p~n"
+           "** When Server state == ~p~n"
+           "** Reason for termination == ~n** ~p~n",
+           [Name, Msg, State, Reason]),
+    sys:print_log(Debug),
+    ok.
+
+%%% ---------------------------------------------------
+%%% Misc. functions.
+%%% ---------------------------------------------------
+
+opt(Op, [{Op, Value}|_]) ->
+    {ok, Value};
+opt(Op, [_|Options]) ->
+    opt(Op, Options);
+opt(_, []) ->
+    false.
+
+debug_options(Name, Opts) ->
+    case opt(debug, Opts) of
+        {ok, Options} -> dbg_options(Name, Options);
+        _ -> dbg_options(Name, [])
+    end.
+
+dbg_options(Name, []) ->
+    Opts =
+        case init:get_argument(generic_debug) of
+            error ->
+                [];
+            _ ->
+                [log, statistics]
+        end,
+    dbg_opts(Name, Opts);
+dbg_options(Name, Opts) ->
+    dbg_opts(Name, Opts).
+
+dbg_opts(Name, Opts) ->
+    case catch sys:debug_options(Opts) of
+        {'EXIT',_} ->
+            format("~p: ignoring erroneous debug options - ~p~n",
+                   [Name, Opts]),
+            [];
+        Dbg ->
+            Dbg
+    end.
+
+%%-----------------------------------------------------------------
+%% Status information
+%%-----------------------------------------------------------------
+%% @hidden
+format_status(Opt, StatusData) ->
+    [PDict, SysState, Parent, Debug, [_Mode, Server, _Role, E]] = StatusData,
+    Header = lists:concat(["Status for generic server ", E#election.name]),
+    Log = sys:get_debug(log, Debug, []),
+    #server{mod = Mod, state = State} = Server,
+    Specific =
+        case erlang:function_exported(Mod, format_status, 2) of
+            true ->
+                case catch apply(Mod, format_status, [Opt, [PDict, State]]) of
+                    {'EXIT', _} -> [{data, [{"State", State}]}];
+                    Else -> Else
+                end;
+            _ ->
+                [{data, [{"State", State}]}]
+        end,
+    [{header, Header},
+     {data, [{"Status", SysState},
+             {"Parent", Parent},
+             {"Logged events", Log}]} |
+     Specific].
+
+
+%%-----------------------------------------------------------------
+%% Leader-election functions
+%%-----------------------------------------------------------------
+
+%% Corresponds to startStage1 in Figure 1 in the Stoller-article
+startStage1(E) ->
+    NodePos = pos(node(),E#election.candidate_nodes),
+    Elid = {NodePos, E#election.incarn, E#election.nextel},
+    NewE = E#election{
+             elid = Elid,
+             nextel = E#election.nextel + 1,
+             down = [],
+             status = elec1},
+    case NodePos of
+        1 ->
+            startStage2(NewE);
+        _ ->
+            mon_nodes(NewE,lesser(node(),E#election.candidate_nodes))
+    end.
+
+%% Corresponds to startStage2
+startStage2(E) ->
+    continStage2(E#election{status = elec2, pendack = node(), acks = []}).
+
+continStage2(E) ->
+    case (pos(E#election.pendack,E#election.candidate_nodes)
+          < length(E#election.candidate_nodes)) of
+        true ->
+            Pendack = next(E#election.pendack,E#election.candidate_nodes),
+            NewE = mon_nodes(E,[Pendack]),
+            {E#election.name,Pendack} ! {halt,E#election.elid,self()},
+            NewE#election{pendack = Pendack};
+        false ->
+                                                % I am the leader
+                                                % io:format("I am the leader (Node ~w) ~n", [node()]),
+            E#election{leader = self(),
+                       leadernode = node(),
+                       status = norm}
+    end.
+
+%% corresponds to Halting
+halting(E,T,From) ->
+    NewE = mon_node(E,From),
+    NewE#election{elid = T,
+                  status = wait,
+                  leadernode = node(From),
+                  down = E#election.down -- [node(From)]
+                 }.
+
+%% Start monitor a bunch of candidate nodes
+mon_nodes(E,Nodes) ->
+    E1 =
+        case E#election.cand_timer of
+            undefined ->
+                {ok, TRef} = timer:send_interval(E#election.cand_timer_int, {candidate_timer}),
+                %% io:format("Starting candidate timer (~w): ~w\n", [TRef, Nodes]),
+                E#election{cand_timer = TRef};
+            _ ->
+                E
+        end,
+    FromNode = node(),
+    foldl(
+      fun(ToNode,El) ->
+              Pid  = {El#election.name, ToNode},
+              %% io:format("Sending {heartbeat, ~w} to node ~w\n", [FromNode, ToNode]),
+              Pid ! {heartbeat, FromNode},
+              mon_node(El, Pid)
+      end,E1,Nodes -- [node()]).
+
+%% Star monitoring one Process
+mon_node(E,Proc) ->
+    Node = case Proc of
+               {_Name,Node_}     -> Node_;
+               Pid when is_pid(Pid) -> node(Pid)
+           end,
+    case iselem(Node,E#election.monitored) of
+        true ->
+            E;
+        false ->
+            Ref = erlang:monitor(process,Proc),
+            E#election{monitored = [{Ref,Node} | E#election.monitored]}
+    end.
+
+
+%%%% Stop monitoring of a bunch of nodes
+%%%demon_nodes(E) ->
+%%%    foreach(fun({R,_}) ->
+%%%                    erlang:demonitor(R)
+%%%            end,E#election.monitored),
+%%%    E#election{monitored = []}.
+
+%%% checks if the proc has become the leader, if so switch to loop
+hasBecomeLeader(E,Server,Msg) ->
+    case ((E#election.status == norm) and (E#election.leader == self())) of
+        true ->
+            {ok,Synch,NewState} =
+                (Server#server.mod):elected(Server#server.state,E,undefined),
+            lists:foreach(
+              fun(Node) ->
+                      {E#election.name,Node} !
+                          {ldr, Synch, E#election.elid, self()}
+              end,E#election.acks),
+
+            %% Make sure we will try to contact all workers!
+            NewE = E#election{work_down = E#election.worker_nodes},
+
+            %% io:format("==> I am the leader! (acks: ~200p)\n", [E#election.acks]),
+            %% Set the internal timeout (corresponds to Periodically)
+            timer:send_after(E#election.cand_timer_int, {heartbeat, node()}),
+            %%    (It's meaningful only when I am the leader!)
+            loop(Server#server{state = NewState},elected,NewE,Msg);
+        false ->
+            safe_loop(Server,candidate,E,Msg)
+    end.
+
+%%%
+%%%
+%%% incarnation should return an integer value for the next
+%%% incarnation of this node. We create a file for each node,
+%%% this file contains a counter. When starting the system for the
+%%% first time, the files should be intialized with 0 incarnation
+%%% counter for all nodes orelse be removed, since we create
+%%% files if not present with counter 1.
+%%%
+%%% Atomicity: This approach is safe as long as there is only
+%%% one gen_leader with a given RegName running per node.
+%%%
+incarnation(VarDir, RegName, Node) ->
+    Name = filename:join(VarDir, atom_to_list(RegName) ++ "_" ++ atom_to_list(Node)),
+    case file:read_file_info(Name) of
+        {error,_Reason} ->
+            ok = file:write_file(Name,term_to_binary(1)),
+            0;
+        {ok,_} ->
+            {ok,Bin} = file:read_file(Name),
+            Incarn = binary_to_term(Bin),
+            ok = file:write_file(Name,term_to_binary(Incarn+1)),
+            Incarn
+    end.
+
+
+broadcast(Msg, #election{monitored = Monitored} = E) ->
+    %% This function is used for broadcasts,
+    %% and we make sure only to broadcast to already known nodes.
+    ToNodes = [N || {_,N} <- Monitored],
+    broadcast(Msg, ToNodes, E).
+
+broadcast({from_leader, Msg}, ToNodes, E) ->
+    foreach(
+      fun(Node) ->
+              {E#election.name,Node} ! {from_leader, Msg}
+      end,ToNodes),
+    E.
+
+iselem(_,[]) ->
+    false;
+iselem(P,[{_,P}|_]) ->
+    true;
+iselem(P,[_ | Ns]) ->
+    iselem(P,Ns).
+
+lesser(_,[]) ->
+    [];
+lesser(N,[N|_]) ->
+    [];
+lesser(N,[M|Ms]) ->
+    [M|lesser(N,Ms)].
+
+next(_,[]) ->
+    no_val;
+next(N,[N|Ms]) ->
+    lists:nth(1,Ms);
+next(N,[_|Ms]) ->
+    next(N,Ms).
+
+pos(_, []) ->
+    100000;
+pos(N1,[N1|_]) ->
+    1;
+pos(N1,[_|Ns]) ->
+    1+pos(N1,Ns).
+
+check_candidates(#election{down = Down} = E) ->
+    NewDown = [N || N <- Down, {ok, up} =/= net_kernel:node_info(N, state)],
+    NewAlive = E#election.candidate_nodes -- NewDown,
+    E#election{down = NewDown, alive = NewAlive}.
+
+broadcast_candidates(E, Synch, IgnoreNodes) ->
+    case E#election.bcast_type of
+        all ->
+            Nodes = [N || {_,N} <- E#election.monitored] -- IgnoreNodes,
+            broadcast({from_leader, Synch}, Nodes, E);
+        _ ->
+            ok
+    end.


More information about the scm-commits mailing list