Package io.mats3.matssocket.impl
Class ClusterStoreAndForward_SQL
java.lang.Object
io.mats3.matssocket.impl.ClusterStoreAndForward_SQL
- All Implemented Interfaces:
ClusterStoreAndForward
An implementation of CSAF relying on a shared SQL database to store the necessary information in a cluster setting.
NOTE: This CSAF implementation expects that the database tables are in place. A tool is provided for this,
using Flyway:
ClusterStoreAndForward_SQL_DbMigrations
.
NOTE: There is heavy reliance on the Mats' MatsFactory.ContextLocal
feature whereby the current Mats
StageProcessor-contextual SQL Connection is available using the MatsFactory.ContextLocal.getAttribute(Class, String...)
method. This since several of the methods on this interface will be invoked within a Mats initiation or Stage process
lambda, and thus participating in the transactional demarcation established there is vital to achieve the guaranteed
delivery and exactly-once delivery semantics.-
Nested Class Summary
Nested classes/interfaces inherited from interface io.mats3.matssocket.ClusterStoreAndForward
ClusterStoreAndForward.CurrentNode, ClusterStoreAndForward.DataAccessException, ClusterStoreAndForward.MessageIdAlreadyExistsException, ClusterStoreAndForward.RequestCorrelation, ClusterStoreAndForward.SimpleCurrentNode, ClusterStoreAndForward.SimpleRequestCorrelation, ClusterStoreAndForward.SimpleStoredInMessage, ClusterStoreAndForward.SimpleStoredOutMessage, ClusterStoreAndForward.StoredInMessage, ClusterStoreAndForward.StoredOutMessage, ClusterStoreAndForward.WrongUserException
-
Constructor Summary
ModifierConstructorDescriptionprotected
ClusterStoreAndForward_SQL
(DataSource dataSource, String nodename, Clock clock) -
Method Summary
Modifier and TypeMethodDescriptionvoid
boot()
Start theClusterStoreAndForward
, perform DB preparations and migrations.void
closeSession
(String matsSocketSessionId) Invoked when the client explicitly tells us that he closed this session, CLOSE_SESSION.static ClusterStoreAndForward_SQL
create
(DataSource dataSource, String nodename) void
deleteMessageIdsFromInbox
(String matsSocketSessionId, Collection<String> clientMessageIds) Deletes the incoming message Ids, as we've established that the client will never try to send this particular message again.void
deregisterSessionFromThisNode
(String matsSocketSessionId, String connectionId) Deregisters a Session home when a WebSocket is closed.getAndDeleteRequestCorrelation
(String matsSocketSessionId, String serverMessageId) getCurrentRegisteredNodeForSession
(String matsSocketSessionId) getMessageFromInbox
(String matsSocketSessionId, String clientMessageId) getMessagesFromOutbox
(String matsSocketSessionId, int maxNumberOfMessages) Fetch a set of messages, up to 'maxNumberOfMessages' - but do not include messages that have been attempted delivered already (marked withClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection)
).getSessions
(boolean onlyActive, String userId, String appName, String appVersionAtOrAbove) Direct implementation ofMatsSocketServer.getMatsSocketSessions(boolean, String, String, String)
- go read there for semantics.int
getSessionsCount
(boolean onlyActive, String userId, String appName, String appVersionAtOrAbove) Direct implementation ofMatsSocketServer.getMatsSocketSessionsCount(boolean, String, String, String)
- go read there for semantics.boolean
isSessionExists
(String matsSocketSessionId) void
notifySessionLiveliness
(Collection<String> matsSocketSessionIds) Shall be invoked on some kind of schedule (e.g.void
outboxMessagesAttemptedDelivery
(String matsSocketSessionId, Collection<String> serverMessageIds) Marks the specified messages as attempted delivered and notches theClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
one up.void
outboxMessagesComplete
(String matsSocketSessionId, Collection<String> serverMessageIds) States that the messages are delivered.void
outboxMessagesDeadLetterQueue
(String matsSocketSessionId, Collection<String> serverMessageIds) States that the messages overran the accepted number of delivery attempts.void
outboxMessagesUnmarkAttemptedDelivery
(String matsSocketSessionId) When this method is invoked,ClusterStoreAndForward.getMessagesFromOutbox(String, int)
will again return messages that has previously been marked as attempted delivered withClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection)
.long
registerSessionAtThisNode
(String matsSocketSessionId, String userId, String connectionId, String clientLibAndVersions, String appName, String appVersion) Registers a Session home to this node - only one node can ever be home, so any old is deleted.int
Shall be invoked on some kind of schedule (e.g.void
storeMessageIdInInbox
(String matsSocketSessionId, String clientMessageId) Stores the incoming message Id, to avoid double delivery.storeMessageInOutbox
(String matsSocketSessionId, String serverMessageId, String clientMessageId, String traceId, MatsSocketServer.MessageType type, Long requestTimestamp, String envelope, String messageJson, byte[] messageBinary) Stores the message for the Session, returning the nodename for the node holding the session, if any.void
storeRequestCorrelation
(String matsSocketSessionId, String serverMessageId, long requestTimestamp, String replyTerminatorId, String correlationString, byte[] correlationBinary) timeoutSessions
(long notLiveSinceTimestamp) Shall be invoked on some kind of schedule (e.g.void
updateMessageInInbox
(String matsSocketSessionId, String clientMessageId, String envelopeWithMessage, byte[] messageBinary) Stores the resulting message (envelope and binary), so that if the incoming messages comes again (based onClusterStoreAndForward.storeMessageIdInInbox(String, String)
throwingClusterStoreAndForward.MessageIdAlreadyExistsException
), the result from the previous processing can be returned right away.
-
Constructor Details
-
ClusterStoreAndForward_SQL
-
-
Method Details
-
create
-
boot
public void boot()Description copied from interface:ClusterStoreAndForward
Start theClusterStoreAndForward
, perform DB preparations and migrations.- Specified by:
boot
in interfaceClusterStoreAndForward
-
registerSessionAtThisNode
public long registerSessionAtThisNode(String matsSocketSessionId, String userId, String connectionId, String clientLibAndVersions, String appName, String appVersion) throws ClusterStoreAndForward.WrongUserException, ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
Registers a Session home to this node - only one node can ever be home, so any old is deleted. When "re-registering" a session, it is asserted that the provided 'userId' is the same UserId as originally registered - aClusterStoreAndForward.WrongUserException
is thrown if this does not match.- Specified by:
registerSessionAtThisNode
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the SessionId for this connection.userId
- the UserId, as provided byAuthenticationPlugin
, that own this MatsSocketSessionIdconnectionId
- an id that is unique for this specific WebSocket Session (i.e. TCP Connection), so that if it closes, a new registration will not be deregistered by the old MatsSocketSession realizing that it is closed and then invokingClusterStoreAndForward.deregisterSessionFromThisNode(String, String)
clientLibAndVersions
- the Client Library and Versions + runtime information for the Client.appName
- the AppName of the accessing Client appappVersion
- the AppVersion of the accessing Client app- Returns:
- the created timestamp - which is either "now" if this is the first register, or if it a "reconnect", when this MatsSocketSession was initially created
- Throws:
ClusterStoreAndForward.WrongUserException
- if the userId provided does not match the original userId that created the session.ClusterStoreAndForward.DataAccessException
- if problems with underlying data store.
-
deregisterSessionFromThisNode
public void deregisterSessionFromThisNode(String matsSocketSessionId, String connectionId) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
Deregisters a Session home when a WebSocket is closed. This node's nodename and this WebSocket Session's ConnectionId is taken into account, so that if it has changed async, it will not deregister a new session home (which can potentially be on the same node the 'connectionId' parameter).- Specified by:
deregisterSessionFromThisNode
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the MatsSocketSessionId for which to deregister the specific WebSocket Session's ConnectionId.connectionId
- an id that is unique for this specific WebSocket Session (i.e. TCP Connection), so that if it closes, a new registration will not be deregistered by the old MatsSocketSession realizing that it is closed and then invokingClusterStoreAndForward.deregisterSessionFromThisNode(String, String)
- Throws:
ClusterStoreAndForward.DataAccessException
-
getCurrentRegisteredNodeForSession
public Optional<ClusterStoreAndForward.CurrentNode> getCurrentRegisteredNodeForSession(String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException - Specified by:
getCurrentRegisteredNodeForSession
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the MatsSocketSessionId for which to find current session home.- Returns:
- the current node holding MatsSocket Session, or empty if none.
- Throws:
ClusterStoreAndForward.DataAccessException
-
isSessionExists
public boolean isSessionExists(String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException - Specified by:
isSessionExists
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the MatsSocketSessionId for which to check whether there is a session- Returns:
- whether a CSAF Session exists - NOTE: Not whether it is currently registered!
- Throws:
ClusterStoreAndForward.DataAccessException
-
getSessions
public List<MatsSocketServer.MatsSocketSessionDto> getSessions(boolean onlyActive, String userId, String appName, String appVersionAtOrAbove) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
Direct implementation ofMatsSocketServer.getMatsSocketSessions(boolean, String, String, String)
- go read there for semantics.- Specified by:
getSessions
in interfaceClusterStoreAndForward
- Returns:
- the list of all MatsSocketSessions currently registered with this MatsSocketServer instance matching the
constraints if set - as read from the
data store
. - Throws:
ClusterStoreAndForward.DataAccessException
-
getSessionsCount
public int getSessionsCount(boolean onlyActive, String userId, String appName, String appVersionAtOrAbove) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
Direct implementation ofMatsSocketServer.getMatsSocketSessionsCount(boolean, String, String, String)
- go read there for semantics.- Specified by:
getSessionsCount
in interfaceClusterStoreAndForward
- Returns:
- the count of all MatsSocketSessions currently registered with this MatsSocketServer instance matching the
constraints if set - as read from the
data store
. - Throws:
ClusterStoreAndForward.DataAccessException
-
closeSession
public void closeSession(String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
Invoked when the client explicitly tells us that he closed this session, CLOSE_SESSION. This deletes the session instance, and any messages in queue for it. No new incoming REPLYs, SENDs or RECEIVEs for this SessionId will be sent anywhere. (Notice that the implementation might still store any new outboxed messages, not checking that the session actually exists. But this SessionId will never be reused (exception without extreme "luck"), and the implementation will periodically purge such non-session-attached outbox messages).- Specified by:
closeSession
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the MatsSocketSessionId that should be closed.- Throws:
ClusterStoreAndForward.DataAccessException
-
notifySessionLiveliness
public void notifySessionLiveliness(Collection<String> matsSocketSessionIds) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
Shall be invoked on some kind of schedule (e.g. every 1 minute) by every node to informClusterStoreAndForward
about which Sessions are currently live on that node. Sessions that aren't live, will be scavenged after some time by invocation ofClusterStoreAndForward.timeoutSessions(long)
.- Specified by:
notifySessionLiveliness
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionIds
- which sessions are currently live on the invoking node.- Throws:
ClusterStoreAndForward.DataAccessException
-
timeoutSessions
public Collection<String> timeoutSessions(long notLiveSinceTimestamp) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
Shall be invoked on some kind of schedule (e.g. every 10 minutes on average). Times out any session which have not beennotified about liveliness
since the supplied timestamp (millis from epoch).- Specified by:
timeoutSessions
in interfaceClusterStoreAndForward
- Parameters:
notLiveSinceTimestamp
- decides which sessions are too old: Sessions which have not beennotified about liveliness
since this timestamp will be deleted.- Returns:
- which sessions was timed out.
- Throws:
ClusterStoreAndForward.DataAccessException
-
scavengeSessionRemnants
Description copied from interface:ClusterStoreAndForward
Shall be invoked on some kind of schedule (e.g. every hour on average). Scavenges all inboxes, outboxes and "request boxes" for any lingering data for closed and timed out sessions.- Specified by:
scavengeSessionRemnants
in interfaceClusterStoreAndForward
- Returns:
- how many items was scavenged, for logging. May return constant zero if not readily available.
- Throws:
ClusterStoreAndForward.DataAccessException
-
storeMessageIdInInbox
public void storeMessageIdInInbox(String matsSocketSessionId, String clientMessageId) throws ClusterStoreAndForward.MessageIdAlreadyExistsException, ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
Stores the incoming message Id, to avoid double delivery. If the Client MessageId (cmid) already exists, aClusterStoreAndForward.MessageIdAlreadyExistsException
will be raised - this implies that a double delivery has occurred.- Specified by:
storeMessageIdInInbox
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the MatsSocketSessionId for which to store the incoming message id.clientMessageId
- the client's message Id for the incoming message.- Throws:
ClusterStoreAndForward.MessageIdAlreadyExistsException
- if the Client MessageId (cmid) already existed for this SessionId.ClusterStoreAndForward.DataAccessException
-
updateMessageInInbox
public void updateMessageInInbox(String matsSocketSessionId, String clientMessageId, String envelopeWithMessage, byte[] messageBinary) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
Stores the resulting message (envelope and binary), so that if the incoming messages comes again (based onClusterStoreAndForward.storeMessageIdInInbox(String, String)
throwingClusterStoreAndForward.MessageIdAlreadyExistsException
), the result from the previous processing can be returned right away.- Specified by:
updateMessageInInbox
in interfaceClusterStoreAndForward
- Throws:
ClusterStoreAndForward.DataAccessException
-
getMessageFromInbox
public ClusterStoreAndForward.StoredInMessage getMessageFromInbox(String matsSocketSessionId, String clientMessageId) throws ClusterStoreAndForward.DataAccessException - Specified by:
getMessageFromInbox
in interfaceClusterStoreAndForward
- Throws:
ClusterStoreAndForward.DataAccessException
-
deleteMessageIdsFromInbox
public void deleteMessageIdsFromInbox(String matsSocketSessionId, Collection<String> clientMessageIds) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
Deletes the incoming message Ids, as we've established that the client will never try to send this particular message again.- Specified by:
deleteMessageIdsFromInbox
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the MatsSocketSessionId for which to delete the incoming message id.clientMessageIds
- the client's message Ids for the incoming messages to delete.- Throws:
ClusterStoreAndForward.DataAccessException
-
storeMessageInOutbox
public Optional<ClusterStoreAndForward.CurrentNode> storeMessageInOutbox(String matsSocketSessionId, String serverMessageId, String clientMessageId, String traceId, MatsSocketServer.MessageType type, Long requestTimestamp, String envelope, String messageJson, byte[] messageBinary) throws ClusterStoreAndForward.DataAccessException, ClusterStoreAndForward.MessageIdAlreadyExistsException Description copied from interface:ClusterStoreAndForward
Stores the message for the Session, returning the nodename for the node holding the session, if any. If the session is closed/timed out, the message won't be stored (i.e. dumped on the floor) and the return value is empty. The Server MessageId (smid) is set by the caller - and since this might have a collision, the method throwsClusterStoreAndForward.MessageIdAlreadyExistsException
if unique constraint fails. In this case, a new Server MessageId should be picked and try again.- Specified by:
storeMessageInOutbox
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the matsSocketSessionId that the message is meant for.serverMessageId
- unique id for outbox'ed message within the MatsSocketSessionId. Must be long enough that it is extremely unlikely that is collides with another message within the same MatsSocketSessionId - but do remember that there will at any one time be pretty few messages in the outbox for a given MatsSocketSession.clientMessageId
- For Client Replies to requests from Server-to-Client, this is the Client's Message Id, which we need to send back with the Reply so that the Client can correlate the Request. (Nullable, since not all messages are replies).traceId
- the server-side traceId for this message.type
- the type of the outgoing message (RESOLVE, REJECT or PUB).requestTimestamp
- For Client Replies to requests from Server-to-Client, this is the timestamp we received the Request.envelope
- the JSON-serialized MatsSocket Envelope without the 'msg' field set.messageJson
- the JSON-serialized message - the piece that sits in the 'msg' field of the Envelope. (Nullable)messageBinary
- the binary part of an outgoing message. (Nullable)- Returns:
- the current node holding MatsSocket Session, or empty if none.
- Throws:
ClusterStoreAndForward.MessageIdAlreadyExistsException
- if the Server MessageId (smid) already existed for this SessionId.ClusterStoreAndForward.DataAccessException
-
getMessagesFromOutbox
public List<ClusterStoreAndForward.StoredOutMessage> getMessagesFromOutbox(String matsSocketSessionId, int maxNumberOfMessages) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
Fetch a set of messages, up to 'maxNumberOfMessages' - but do not include messages that have been attempted delivered already (marked withClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection)
). IfClusterStoreAndForward.outboxMessagesUnmarkAttemptedDelivery(String)
is invoked, that mark will be unset.- Specified by:
getMessagesFromOutbox
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the matsSocketSessionId that the message is meant for.maxNumberOfMessages
- the maximum number of messages to fetch.- Returns:
- a list of json encoded messages destined for the WebSocket.
- Throws:
ClusterStoreAndForward.DataAccessException
-
outboxMessagesAttemptedDelivery
public void outboxMessagesAttemptedDelivery(String matsSocketSessionId, Collection<String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
Marks the specified messages as attempted delivered and notches theClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
one up. IfClusterStoreAndForward.outboxMessagesUnmarkAttemptedDelivery(String)
is invoked (typically on reconnect), the mark will be unset, but the delivery count will stay in place - this is to be able to abort delivery attempts if there is something wrong with the message.- Specified by:
outboxMessagesAttemptedDelivery
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds
- which messages failed delivery.- Throws:
ClusterStoreAndForward.DataAccessException
-
outboxMessagesUnmarkAttemptedDelivery
public void outboxMessagesUnmarkAttemptedDelivery(String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
When this method is invoked,ClusterStoreAndForward.getMessagesFromOutbox(String, int)
will again return messages that has previously been marked as attempted delivered withClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection)
. Notice that theClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
will not be reset, but theClusterStoreAndForward.StoredOutMessage.getAttemptTimestamp()
will now again return null.- Specified by:
outboxMessagesUnmarkAttemptedDelivery
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the matsSocketSessionId whose messages now shall be attempted.- Throws:
ClusterStoreAndForward.DataAccessException
-
outboxMessagesComplete
public void outboxMessagesComplete(String matsSocketSessionId, Collection<String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
States that the messages are delivered. Will typically delete the message.- Specified by:
outboxMessagesComplete
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds
- which messages are complete.- Throws:
ClusterStoreAndForward.DataAccessException
-
outboxMessagesDeadLetterQueue
public void outboxMessagesDeadLetterQueue(String matsSocketSessionId, Collection<String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForward
States that the messages overran the accepted number of delivery attempts.- Specified by:
outboxMessagesDeadLetterQueue
in interfaceClusterStoreAndForward
- Parameters:
matsSocketSessionId
- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds
- which messages should be DLQed.- Throws:
ClusterStoreAndForward.DataAccessException
-
storeRequestCorrelation
public void storeRequestCorrelation(String matsSocketSessionId, String serverMessageId, long requestTimestamp, String replyTerminatorId, String correlationString, byte[] correlationBinary) throws ClusterStoreAndForward.DataAccessException - Specified by:
storeRequestCorrelation
in interfaceClusterStoreAndForward
- Throws:
ClusterStoreAndForward.DataAccessException
-
getAndDeleteRequestCorrelation
public Optional<ClusterStoreAndForward.RequestCorrelation> getAndDeleteRequestCorrelation(String matsSocketSessionId, String serverMessageId) throws ClusterStoreAndForward.DataAccessException - Specified by:
getAndDeleteRequestCorrelation
in interfaceClusterStoreAndForward
- Throws:
ClusterStoreAndForward.DataAccessException
-