Class NonBlockingCoordinator
- java.lang.Object
-
- org.apache.catalina.tribes.group.ChannelInterceptorBase
-
- org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator
-
- All Implemented Interfaces:
ChannelInterceptor
,Heartbeat
,MembershipListener
public class NonBlockingCoordinator extends ChannelInterceptorBase
Title: Auto merging leader election algorithm
Description: Implementation of a simple coordinator algorithm that not only selects a coordinator, it also merges groups automatically when members are discovered that weren't part of the
This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on
This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership to pass a token ring of the current membership.
This is not the same as just using AbsoluteOrder! Consider the following scenario:
Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all nodes are receiving pings from all the other nodes. meaning, that node{i} receives pings from node{all}-node{i}
but the following could happen if a multicast problem occurs. A has members {B,C,D}
B has members {A,C}
C has members {D,E}
D has members {A,B,C,E}
E has members {A,C,D}
Because the default Tribes membership implementation, relies on the multicast packets to arrive at all nodes correctly, there is nothing guaranteeing that it will.
To best explain how this algorithm works, lets take the above example: For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work where messages overlap, as they all depend on absolute order
Scenario 1: A,B,C,D,E all come online at the same time Eval phase, A thinks of itself as leader, B thinks of A as leader, C thinks of itself as leader, D,E think of A as leader
Token phase:
(1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)
(1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)
(2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C
(2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E
(3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D
(3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A
(4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A
(4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members
(5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E}
At this point, the state looks like
A - {A-ldr, mbrs-A,B,C,D,E, id=X}
B - {A-ldr, mbrs-A,B,C,D, id=X}
C - {A-ldr, mbrs-A,B,C,D,E, id=X}
D - {A-ldr, mbrs-A,B,C,D,E, id=X}
E - {A-ldr, mbrs-A,B,C,D,E, id=Y}
A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader. As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have arrived at the same membership and all nodes are informed of each other.
To synchronize the rest we simply perform the following check at A when A receives X:
Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}
Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B When A receives X again, the token is complete.
Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then install and accept the view.Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.
Lets also assume that C1 sees the following view {B,D,E}
C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.
In the scenario where C1 sees {D,E} and A,B,C cannot see C1, no token will ever arrive.
In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D
D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E
E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A
A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again. At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,ETo ensure that the view gets implemented at all nodes at the same time, A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.
Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships
The example above, of course can be simplified with a finite statemachine:
But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.
Maybe I'll do a state diagram :)State Diagrams
Initiate an election
Receive an election message- Version:
- 1.0
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
NonBlockingCoordinator.CoordinationEvent
static class
NonBlockingCoordinator.CoordinationMessage
-
Nested classes/interfaces inherited from interface org.apache.catalina.tribes.ChannelInterceptor
ChannelInterceptor.InterceptorEvent
-
-
Field Summary
Fields Modifier and Type Field Description protected static byte[]
COORD_ALIVE
Alive messageprotected static byte[]
COORD_CONF
Coordination confirmation, for blocking installationsprotected static byte[]
COORD_HEADER
header for a coordination messageprotected static byte[]
COORD_REQUEST
Coordination requestprotected java.util.concurrent.atomic.AtomicBoolean
coordMsgReceived
protected java.lang.Object
electionMutex
protected Membership
membership
Our nonblocking membershipprotected static StringManager
sm
protected boolean
started
protected int
startsvc
protected Membership
suggestedView
protected UniqueId
suggestedviewId
indicates that we are running an election and this is the one we are runningprotected Membership
view
Our current viewprotected UniqueId
viewId
Out current viewIdprotected long
waitForCoordMsgTimeout
Time to wait for coordination timeout-
Fields inherited from class org.apache.catalina.tribes.group.ChannelInterceptorBase
optionFlag
-
-
Constructor Summary
Constructors Constructor Description NonBlockingCoordinator()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected boolean
alive(Member mbr)
ChannelData
createData(NonBlockingCoordinator.CoordinationMessage msg, Member local)
void
fireInterceptorEvent(ChannelInterceptor.InterceptorEvent event)
Member
getCoordinator()
Returns coordinator if one is availableMember
getLocalMember(boolean incAlive)
Return the member that represents this node.Member
getMember(Member mbr)
Intercepts theChannel.getMember(Member)
methodMember[]
getMembers()
Get all current cluster membersMember[]
getView()
UniqueId
getViewId()
protected void
halt()
Block in/out messages while a election is going onprotected void
handleMyToken(Member local, NonBlockingCoordinator.CoordinationMessage msg, Membership merged)
protected void
handleOtherToken(Member local, NonBlockingCoordinator.CoordinationMessage msg, Membership merged)
protected void
handleToken(NonBlockingCoordinator.CoordinationMessage msg, Membership merged)
protected void
handleViewConf(NonBlockingCoordinator.CoordinationMessage msg, Membership merged)
protected boolean
hasHigherPriority(Member[] complete, Member[] local)
boolean
hasMembers()
has membersvoid
heartbeat()
Theheartbeat()
method gets invoked periodically to allow interceptors to clean up resources, time out object and perform actions that are unrelated to sending/receiving data.boolean
isCoordinator()
boolean
isHighest()
protected boolean
isViewConf(NonBlockingCoordinator.CoordinationMessage msg)
void
memberAdded(Member member)
A member was added to the groupvoid
memberAdded(Member member, boolean elect)
protected boolean
memberAlive(Member mbr, long conTimeout)
void
memberDisappeared(Member member)
A member was removed from the group
If the member left voluntarily, the Member.getCommand will contain the Member.SHUTDOWN_PAYLOAD dataprotected Membership
mergeOnArrive(NonBlockingCoordinator.CoordinationMessage msg)
void
messageReceived(ChannelMessage msg)
themessageReceived
is invoked when a message is received.protected void
processCoordMessage(NonBlockingCoordinator.CoordinationMessage msg)
protected void
release()
Release lock for in/out messages election is completedprotected void
sendElectionMsg(Member local, Member next, NonBlockingCoordinator.CoordinationMessage msg)
protected void
sendElectionMsgToNextInline(Member local, NonBlockingCoordinator.CoordinationMessage msg)
void
sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload)
ThesendMessage
method is called when a message is being sent to one more destinations.protected void
setupMembership()
void
start(int svc)
Starts up the channel.void
startElection(boolean force)
void
stop(int svc)
Shuts down the channel.protected void
waitForRelease()
Wait for an election to end-
Methods inherited from class org.apache.catalina.tribes.group.ChannelInterceptorBase
getChannel, getNext, getOptionFlag, getPrevious, okToProcess, setChannel, setNext, setOptionFlag, setPrevious
-
-
-
-
Field Detail
-
sm
protected static final StringManager sm
-
COORD_HEADER
protected static final byte[] COORD_HEADER
header for a coordination message
-
COORD_REQUEST
protected static final byte[] COORD_REQUEST
Coordination request
-
COORD_CONF
protected static final byte[] COORD_CONF
Coordination confirmation, for blocking installations
-
COORD_ALIVE
protected static final byte[] COORD_ALIVE
Alive message
-
waitForCoordMsgTimeout
protected final long waitForCoordMsgTimeout
Time to wait for coordination timeout- See Also:
- Constant Field Values
-
view
protected volatile Membership view
Our current view
-
viewId
protected UniqueId viewId
Out current viewId
-
membership
protected Membership membership
Our nonblocking membership
-
suggestedviewId
protected UniqueId suggestedviewId
indicates that we are running an election and this is the one we are running
-
suggestedView
protected volatile Membership suggestedView
-
started
protected volatile boolean started
-
startsvc
protected final int startsvc
- See Also:
- Constant Field Values
-
electionMutex
protected final java.lang.Object electionMutex
-
coordMsgReceived
protected final java.util.concurrent.atomic.AtomicBoolean coordMsgReceived
-
-
Method Detail
-
startElection
public void startElection(boolean force) throws ChannelException
- Throws:
ChannelException
-
sendElectionMsg
protected void sendElectionMsg(Member local, Member next, NonBlockingCoordinator.CoordinationMessage msg) throws ChannelException
- Throws:
ChannelException
-
sendElectionMsgToNextInline
protected void sendElectionMsgToNextInline(Member local, NonBlockingCoordinator.CoordinationMessage msg) throws ChannelException
- Throws:
ChannelException
-
createData
public ChannelData createData(NonBlockingCoordinator.CoordinationMessage msg, Member local)
-
alive
protected boolean alive(Member mbr)
-
memberAlive
protected boolean memberAlive(Member mbr, long conTimeout)
-
mergeOnArrive
protected Membership mergeOnArrive(NonBlockingCoordinator.CoordinationMessage msg)
-
processCoordMessage
protected void processCoordMessage(NonBlockingCoordinator.CoordinationMessage msg) throws ChannelException
- Throws:
ChannelException
-
handleToken
protected void handleToken(NonBlockingCoordinator.CoordinationMessage msg, Membership merged) throws ChannelException
- Throws:
ChannelException
-
handleMyToken
protected void handleMyToken(Member local, NonBlockingCoordinator.CoordinationMessage msg, Membership merged) throws ChannelException
- Throws:
ChannelException
-
handleOtherToken
protected void handleOtherToken(Member local, NonBlockingCoordinator.CoordinationMessage msg, Membership merged) throws ChannelException
- Throws:
ChannelException
-
handleViewConf
protected void handleViewConf(NonBlockingCoordinator.CoordinationMessage msg, Membership merged) throws ChannelException
- Throws:
ChannelException
-
isViewConf
protected boolean isViewConf(NonBlockingCoordinator.CoordinationMessage msg)
-
getCoordinator
public Member getCoordinator()
Returns coordinator if one is available- Returns:
- Member
-
getView
public Member[] getView()
-
getViewId
public UniqueId getViewId()
-
halt
protected void halt()
Block in/out messages while a election is going on
-
release
protected void release()
Release lock for in/out messages election is completed
-
waitForRelease
protected void waitForRelease()
Wait for an election to end
-
start
public void start(int svc) throws ChannelException
Description copied from class:ChannelInterceptorBase
Starts up the channel. This can be called multiple times for individual services to start The svc parameter can be the logical or value of any constants- Specified by:
start
in interfaceChannelInterceptor
- Overrides:
start
in classChannelInterceptorBase
- Parameters:
svc
- int value of
DEFAULT - will start all services
MBR_RX_SEQ - starts the membership receiver
MBR_TX_SEQ - starts the membership broadcaster
SND_TX_SEQ - starts the replication transmitter
SND_RX_SEQ - starts the replication receiver- Throws:
ChannelException
- if a startup error occurs or the service is already started.- See Also:
Channel
-
stop
public void stop(int svc) throws ChannelException
Description copied from class:ChannelInterceptorBase
Shuts down the channel. This can be called multiple times for individual services to shutdown The svc parameter can be the logical or value of any constants- Specified by:
stop
in interfaceChannelInterceptor
- Overrides:
stop
in classChannelInterceptorBase
- Parameters:
svc
- int value of
DEFAULT - will shutdown all services
MBR_RX_SEQ - stops the membership receiver
MBR_TX_SEQ - stops the membership broadcaster
SND_TX_SEQ - stops the replication transmitter
SND_RX_SEQ - stops the replication receiver- Throws:
ChannelException
- if a startup error occurs or the service is already started.- See Also:
Channel
-
sendMessage
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException
Description copied from interface:ChannelInterceptor
ThesendMessage
method is called when a message is being sent to one more destinations. The interceptor can modify any of the parameters and then pass on the message down the stack by invokinggetNext().sendMessage(destination,msg,payload)
Alternatively the interceptor can stop the message from being sent by not invokinggetNext().sendMessage(destination,msg,payload)
If the message is to be sent asynchronous the application can be notified of completion and errors by passing in an error handler attached to a payload object.
The ChannelMessage.getAddress contains Channel.getLocalMember, and can be overwritten to simulate a message sent from another node.- Specified by:
sendMessage
in interfaceChannelInterceptor
- Overrides:
sendMessage
in classChannelInterceptorBase
- Parameters:
destination
- Member[] - the destination for this messagemsg
- ChannelMessage - the message to be sentpayload
- InterceptorPayload - the payload, carrying an error handler and future useful data, can be null- Throws:
ChannelException
- if a serialization error happens.- See Also:
ErrorHandler
,InterceptorPayload
-
messageReceived
public void messageReceived(ChannelMessage msg)
Description copied from interface:ChannelInterceptor
themessageReceived
is invoked when a message is received.ChannelMessage.getAddress()
is the sender, or the reply-to address if it has been overwritten.- Specified by:
messageReceived
in interfaceChannelInterceptor
- Overrides:
messageReceived
in classChannelInterceptorBase
- Parameters:
msg
- ChannelMessage
-
memberAdded
public void memberAdded(Member member)
Description copied from interface:MembershipListener
A member was added to the group- Specified by:
memberAdded
in interfaceMembershipListener
- Overrides:
memberAdded
in classChannelInterceptorBase
- Parameters:
member
- Member - the member that was added
-
memberAdded
public void memberAdded(Member member, boolean elect)
-
memberDisappeared
public void memberDisappeared(Member member)
Description copied from interface:MembershipListener
A member was removed from the group
If the member left voluntarily, the Member.getCommand will contain the Member.SHUTDOWN_PAYLOAD data- Specified by:
memberDisappeared
in interfaceMembershipListener
- Overrides:
memberDisappeared
in classChannelInterceptorBase
- Parameters:
member
- Member- See Also:
Member.SHUTDOWN_PAYLOAD
-
isHighest
public boolean isHighest()
-
isCoordinator
public boolean isCoordinator()
-
heartbeat
public void heartbeat()
Description copied from interface:ChannelInterceptor
Theheartbeat()
method gets invoked periodically to allow interceptors to clean up resources, time out object and perform actions that are unrelated to sending/receiving data.- Specified by:
heartbeat
in interfaceChannelInterceptor
- Specified by:
heartbeat
in interfaceHeartbeat
- Overrides:
heartbeat
in classChannelInterceptorBase
-
hasMembers
public boolean hasMembers()
has members- Specified by:
hasMembers
in interfaceChannelInterceptor
- Overrides:
hasMembers
in classChannelInterceptorBase
- Returns:
- boolean - if the channel has members in its membership group
- See Also:
Channel.hasMembers()
-
getMembers
public Member[] getMembers()
Get all current cluster members- Specified by:
getMembers
in interfaceChannelInterceptor
- Overrides:
getMembers
in classChannelInterceptorBase
- Returns:
- all members or empty array
- See Also:
Channel.getMembers()
-
getMember
public Member getMember(Member mbr)
Description copied from interface:ChannelInterceptor
Intercepts theChannel.getMember(Member)
method- Specified by:
getMember
in interfaceChannelInterceptor
- Overrides:
getMember
in classChannelInterceptorBase
- Parameters:
mbr
- Member- Returns:
- Member
- See Also:
Channel.getMember(Member)
-
getLocalMember
public Member getLocalMember(boolean incAlive)
Return the member that represents this node.- Specified by:
getLocalMember
in interfaceChannelInterceptor
- Overrides:
getLocalMember
in classChannelInterceptorBase
- Parameters:
incAlive
- boolean- Returns:
- Member
- See Also:
Channel.getLocalMember(boolean)
-
setupMembership
protected void setupMembership()
-
fireInterceptorEvent
public void fireInterceptorEvent(ChannelInterceptor.InterceptorEvent event)
- Specified by:
fireInterceptorEvent
in interfaceChannelInterceptor
- Overrides:
fireInterceptorEvent
in classChannelInterceptorBase
-
-