Package io.mats3.matssocket.impl
Class ClusterStoreAndForward_SQL
java.lang.Object
io.mats3.matssocket.impl.ClusterStoreAndForward_SQL
- All Implemented Interfaces:
ClusterStoreAndForward
An implementation of CSAF relying on a shared SQL database to store the necessary information in a cluster setting.
NOTE: This CSAF implementation expects that the database tables are in place. A tool is provided for this,
using Flyway:
ClusterStoreAndForward_SQL_DbMigrations.
NOTE: There is heavy reliance on the Mats' MatsFactory.ContextLocal feature whereby the current Mats
StageProcessor-contextual SQL Connection is available using the MatsFactory.ContextLocal.getAttribute(Class, String...)
method. This since several of the methods on this interface will be invoked within a Mats initiation or Stage process
lambda, and thus participating in the transactional demarcation established there is vital to achieve the guaranteed
delivery and exactly-once delivery semantics.-
Nested Class Summary
Nested classes/interfaces inherited from interface io.mats3.matssocket.ClusterStoreAndForward
ClusterStoreAndForward.CurrentNode, ClusterStoreAndForward.DataAccessException, ClusterStoreAndForward.MessageIdAlreadyExistsException, ClusterStoreAndForward.RequestCorrelation, ClusterStoreAndForward.SimpleCurrentNode, ClusterStoreAndForward.SimpleRequestCorrelation, ClusterStoreAndForward.SimpleStoredInMessage, ClusterStoreAndForward.SimpleStoredOutMessage, ClusterStoreAndForward.StoredInMessage, ClusterStoreAndForward.StoredOutMessage, ClusterStoreAndForward.WrongUserException -
Constructor Summary
ConstructorsModifierConstructorDescriptionprotectedClusterStoreAndForward_SQL(DataSource dataSource, String nodename, Clock clock) -
Method Summary
Modifier and TypeMethodDescriptionvoidboot()Start theClusterStoreAndForward, perform DB preparations and migrations.voidcloseSession(String matsSocketSessionId) Invoked when the client explicitly tells us that he closed this session, CLOSE_SESSION.static ClusterStoreAndForward_SQLcreate(DataSource dataSource, String nodename) voiddeleteMessageIdsFromInbox(String matsSocketSessionId, Collection<String> clientMessageIds) Deletes the incoming message Ids, as we've established that the client will never try to send this particular message again.voidderegisterSessionFromThisNode(String matsSocketSessionId, String connectionId) Deregisters a Session home when a WebSocket is closed.getAndDeleteRequestCorrelation(String matsSocketSessionId, String serverMessageId) getCurrentRegisteredNodeForSession(String matsSocketSessionId) getMessageFromInbox(String matsSocketSessionId, String clientMessageId) getMessagesFromOutbox(String matsSocketSessionId, int maxNumberOfMessages) Fetch a set of messages, up to 'maxNumberOfMessages' - but do not include messages that have been attempted delivered already (marked withClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection)).getSessions(boolean onlyActive, String userId, String appName, String appVersionAtOrAbove) Direct implementation ofMatsSocketServer.getMatsSocketSessions(boolean, String, String, String)- go read there for semantics.intgetSessionsCount(boolean onlyActive, String userId, String appName, String appVersionAtOrAbove) Direct implementation ofMatsSocketServer.getMatsSocketSessionsCount(boolean, String, String, String)- go read there for semantics.booleanisSessionExists(String matsSocketSessionId) voidnotifySessionLiveliness(Collection<String> matsSocketSessionIds) Shall be invoked on some kind of schedule (e.g.voidoutboxMessagesAttemptedDelivery(String matsSocketSessionId, Collection<String> serverMessageIds) Marks the specified messages as attempted delivered and notches theClusterStoreAndForward.StoredOutMessage.getDeliveryCount()one up.voidoutboxMessagesComplete(String matsSocketSessionId, Collection<String> serverMessageIds) States that the messages are delivered.voidoutboxMessagesDeadLetterQueue(String matsSocketSessionId, Collection<String> serverMessageIds) States that the messages overran the accepted number of delivery attempts.voidoutboxMessagesUnmarkAttemptedDelivery(String matsSocketSessionId) When this method is invoked,ClusterStoreAndForward.getMessagesFromOutbox(String, int)will again return messages that has previously been marked as attempted delivered withClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection).longregisterSessionAtThisNode(String matsSocketSessionId, String userId, String connectionId, String clientLibAndVersions, String appName, String appVersion) Registers a Session home to this node - only one node can ever be home, so any old is deleted.intShall be invoked on some kind of schedule (e.g.voidstoreMessageIdInInbox(String matsSocketSessionId, String clientMessageId) Stores the incoming message Id, to avoid double delivery.storeMessageInOutbox(String matsSocketSessionId, String serverMessageId, String clientMessageId, String traceId, MatsSocketServer.MessageType type, Long requestTimestamp, String envelope, String messageJson, byte[] messageBinary) Stores the message for the Session, returning the nodename for the node holding the session, if any.voidstoreRequestCorrelation(String matsSocketSessionId, String serverMessageId, long requestTimestamp, String replyTerminatorId, String correlationString, byte[] correlationBinary) timeoutSessions(long notLiveSinceTimestamp) Shall be invoked on some kind of schedule (e.g.voidupdateMessageInInbox(String matsSocketSessionId, String clientMessageId, String envelopeWithMessage, byte[] messageBinary) Stores the resulting message (envelope and binary), so that if the incoming messages comes again (based onClusterStoreAndForward.storeMessageIdInInbox(String, String)throwingClusterStoreAndForward.MessageIdAlreadyExistsException), the result from the previous processing can be returned right away.
-
Constructor Details
-
ClusterStoreAndForward_SQL
-
-
Method Details
-
create
-
boot
public void boot()Description copied from interface:ClusterStoreAndForwardStart theClusterStoreAndForward, perform DB preparations and migrations.- Specified by:
bootin interfaceClusterStoreAndForward
-
registerSessionAtThisNode
public long registerSessionAtThisNode(String matsSocketSessionId, String userId, String connectionId, String clientLibAndVersions, String appName, String appVersion) throws ClusterStoreAndForward.WrongUserException, ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardRegisters a Session home to this node - only one node can ever be home, so any old is deleted. When "re-registering" a session, it is asserted that the provided 'userId' is the same UserId as originally registered - aClusterStoreAndForward.WrongUserExceptionis thrown if this does not match.- Specified by:
registerSessionAtThisNodein interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the SessionId for this connection.userId- the UserId, as provided byAuthenticationPlugin, that own this MatsSocketSessionIdconnectionId- an id that is unique for this specific WebSocket Session (i.e. TCP Connection), so that if it closes, a new registration will not be deregistered by the old MatsSocketSession realizing that it is closed and then invokingClusterStoreAndForward.deregisterSessionFromThisNode(String, String)clientLibAndVersions- the Client Library and Versions + runtime information for the Client.appName- the AppName of the accessing Client appappVersion- the AppVersion of the accessing Client app- Returns:
- the created timestamp - which is either "now" if this is the first register, or if it a "reconnect", when this MatsSocketSession was initially created
- Throws:
ClusterStoreAndForward.WrongUserException- if the userId provided does not match the original userId that created the session.ClusterStoreAndForward.DataAccessException- if problems with underlying data store.
-
deregisterSessionFromThisNode
public void deregisterSessionFromThisNode(String matsSocketSessionId, String connectionId) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardDeregisters a Session home when a WebSocket is closed. This node's nodename and this WebSocket Session's ConnectionId is taken into account, so that if it has changed async, it will not deregister a new session home (which can potentially be on the same node the 'connectionId' parameter).- Specified by:
deregisterSessionFromThisNodein interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the MatsSocketSessionId for which to deregister the specific WebSocket Session's ConnectionId.connectionId- an id that is unique for this specific WebSocket Session (i.e. TCP Connection), so that if it closes, a new registration will not be deregistered by the old MatsSocketSession realizing that it is closed and then invokingClusterStoreAndForward.deregisterSessionFromThisNode(String, String)- Throws:
ClusterStoreAndForward.DataAccessException
-
getCurrentRegisteredNodeForSession
public Optional<ClusterStoreAndForward.CurrentNode> getCurrentRegisteredNodeForSession(String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException - Specified by:
getCurrentRegisteredNodeForSessionin interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the MatsSocketSessionId for which to find current session home.- Returns:
- the current node holding MatsSocket Session, or empty if none.
- Throws:
ClusterStoreAndForward.DataAccessException
-
isSessionExists
public boolean isSessionExists(String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException - Specified by:
isSessionExistsin interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the MatsSocketSessionId for which to check whether there is a session- Returns:
- whether a CSAF Session exists - NOTE: Not whether it is currently registered!
- Throws:
ClusterStoreAndForward.DataAccessException
-
getSessions
public List<MatsSocketServer.MatsSocketSessionDto> getSessions(boolean onlyActive, String userId, String appName, String appVersionAtOrAbove) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardDirect implementation ofMatsSocketServer.getMatsSocketSessions(boolean, String, String, String)- go read there for semantics.- Specified by:
getSessionsin interfaceClusterStoreAndForward- Returns:
- the list of all MatsSocketSessions currently registered with this MatsSocketServer instance matching the
constraints if set - as read from the
data store. - Throws:
ClusterStoreAndForward.DataAccessException
-
getSessionsCount
public int getSessionsCount(boolean onlyActive, String userId, String appName, String appVersionAtOrAbove) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardDirect implementation ofMatsSocketServer.getMatsSocketSessionsCount(boolean, String, String, String)- go read there for semantics.- Specified by:
getSessionsCountin interfaceClusterStoreAndForward- Returns:
- the count of all MatsSocketSessions currently registered with this MatsSocketServer instance matching the
constraints if set - as read from the
data store. - Throws:
ClusterStoreAndForward.DataAccessException
-
closeSession
public void closeSession(String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardInvoked when the client explicitly tells us that he closed this session, CLOSE_SESSION. This deletes the session instance, and any messages in queue for it. No new incoming REPLYs, SENDs or RECEIVEs for this SessionId will be sent anywhere. (Notice that the implementation might still store any new outboxed messages, not checking that the session actually exists. But this SessionId will never be reused (exception without extreme "luck"), and the implementation will periodically purge such non-session-attached outbox messages).- Specified by:
closeSessionin interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the MatsSocketSessionId that should be closed.- Throws:
ClusterStoreAndForward.DataAccessException
-
notifySessionLiveliness
public void notifySessionLiveliness(Collection<String> matsSocketSessionIds) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardShall be invoked on some kind of schedule (e.g. every 1 minute) by every node to informClusterStoreAndForwardabout which Sessions are currently live on that node. Sessions that aren't live, will be scavenged after some time by invocation ofClusterStoreAndForward.timeoutSessions(long).- Specified by:
notifySessionLivelinessin interfaceClusterStoreAndForward- Parameters:
matsSocketSessionIds- which sessions are currently live on the invoking node.- Throws:
ClusterStoreAndForward.DataAccessException
-
timeoutSessions
public Collection<String> timeoutSessions(long notLiveSinceTimestamp) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardShall be invoked on some kind of schedule (e.g. every 10 minutes on average). Times out any session which have not beennotified about livelinesssince the supplied timestamp (millis from epoch).- Specified by:
timeoutSessionsin interfaceClusterStoreAndForward- Parameters:
notLiveSinceTimestamp- decides which sessions are too old: Sessions which have not beennotified about livelinesssince this timestamp will be deleted.- Returns:
- which sessions was timed out.
- Throws:
ClusterStoreAndForward.DataAccessException
-
scavengeSessionRemnants
Description copied from interface:ClusterStoreAndForwardShall be invoked on some kind of schedule (e.g. every hour on average). Scavenges all inboxes, outboxes and "request boxes" for any lingering data for closed and timed out sessions.- Specified by:
scavengeSessionRemnantsin interfaceClusterStoreAndForward- Returns:
- how many items was scavenged, for logging. May return constant zero if not readily available.
- Throws:
ClusterStoreAndForward.DataAccessException
-
storeMessageIdInInbox
public void storeMessageIdInInbox(String matsSocketSessionId, String clientMessageId) throws ClusterStoreAndForward.MessageIdAlreadyExistsException, ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardStores the incoming message Id, to avoid double delivery. If the Client MessageId (cmid) already exists, aClusterStoreAndForward.MessageIdAlreadyExistsExceptionwill be raised - this implies that a double delivery has occurred.- Specified by:
storeMessageIdInInboxin interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the MatsSocketSessionId for which to store the incoming message id.clientMessageId- the client's message Id for the incoming message.- Throws:
ClusterStoreAndForward.MessageIdAlreadyExistsException- if the Client MessageId (cmid) already existed for this SessionId.ClusterStoreAndForward.DataAccessException
-
updateMessageInInbox
public void updateMessageInInbox(String matsSocketSessionId, String clientMessageId, String envelopeWithMessage, byte[] messageBinary) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardStores the resulting message (envelope and binary), so that if the incoming messages comes again (based onClusterStoreAndForward.storeMessageIdInInbox(String, String)throwingClusterStoreAndForward.MessageIdAlreadyExistsException), the result from the previous processing can be returned right away.- Specified by:
updateMessageInInboxin interfaceClusterStoreAndForward- Throws:
ClusterStoreAndForward.DataAccessException
-
getMessageFromInbox
public ClusterStoreAndForward.StoredInMessage getMessageFromInbox(String matsSocketSessionId, String clientMessageId) throws ClusterStoreAndForward.DataAccessException - Specified by:
getMessageFromInboxin interfaceClusterStoreAndForward- Throws:
ClusterStoreAndForward.DataAccessException
-
deleteMessageIdsFromInbox
public void deleteMessageIdsFromInbox(String matsSocketSessionId, Collection<String> clientMessageIds) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardDeletes the incoming message Ids, as we've established that the client will never try to send this particular message again.- Specified by:
deleteMessageIdsFromInboxin interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the MatsSocketSessionId for which to delete the incoming message id.clientMessageIds- the client's message Ids for the incoming messages to delete.- Throws:
ClusterStoreAndForward.DataAccessException
-
storeMessageInOutbox
public Optional<ClusterStoreAndForward.CurrentNode> storeMessageInOutbox(String matsSocketSessionId, String serverMessageId, String clientMessageId, String traceId, MatsSocketServer.MessageType type, Long requestTimestamp, String envelope, String messageJson, byte[] messageBinary) throws ClusterStoreAndForward.DataAccessException, ClusterStoreAndForward.MessageIdAlreadyExistsException Description copied from interface:ClusterStoreAndForwardStores the message for the Session, returning the nodename for the node holding the session, if any. If the session is closed/timed out, the message won't be stored (i.e. dumped on the floor) and the return value is empty. The Server MessageId (smid) is set by the caller - and since this might have a collision, the method throwsClusterStoreAndForward.MessageIdAlreadyExistsExceptionif unique constraint fails. In this case, a new Server MessageId should be picked and try again.- Specified by:
storeMessageInOutboxin interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the matsSocketSessionId that the message is meant for.serverMessageId- unique id for outbox'ed message within the MatsSocketSessionId. Must be long enough that it is extremely unlikely that is collides with another message within the same MatsSocketSessionId - but do remember that there will at any one time be pretty few messages in the outbox for a given MatsSocketSession.clientMessageId- For Client Replies to requests from Server-to-Client, this is the Client's Message Id, which we need to send back with the Reply so that the Client can correlate the Request. (Nullable, since not all messages are replies).traceId- the server-side traceId for this message.type- the type of the outgoing message (RESOLVE, REJECT or PUB).requestTimestamp- For Client Replies to requests from Server-to-Client, this is the timestamp we received the Request.envelope- the JSON-serialized MatsSocket Envelope without the 'msg' field set.messageJson- the JSON-serialized message - the piece that sits in the 'msg' field of the Envelope. (Nullable)messageBinary- the binary part of an outgoing message. (Nullable)- Returns:
- the current node holding MatsSocket Session, or empty if none.
- Throws:
ClusterStoreAndForward.MessageIdAlreadyExistsException- if the Server MessageId (smid) already existed for this SessionId.ClusterStoreAndForward.DataAccessException
-
getMessagesFromOutbox
public List<ClusterStoreAndForward.StoredOutMessage> getMessagesFromOutbox(String matsSocketSessionId, int maxNumberOfMessages) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardFetch a set of messages, up to 'maxNumberOfMessages' - but do not include messages that have been attempted delivered already (marked withClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection)). IfClusterStoreAndForward.outboxMessagesUnmarkAttemptedDelivery(String)is invoked, that mark will be unset.- Specified by:
getMessagesFromOutboxin interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the matsSocketSessionId that the message is meant for.maxNumberOfMessages- the maximum number of messages to fetch.- Returns:
- a list of json encoded messages destined for the WebSocket.
- Throws:
ClusterStoreAndForward.DataAccessException
-
outboxMessagesAttemptedDelivery
public void outboxMessagesAttemptedDelivery(String matsSocketSessionId, Collection<String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardMarks the specified messages as attempted delivered and notches theClusterStoreAndForward.StoredOutMessage.getDeliveryCount()one up. IfClusterStoreAndForward.outboxMessagesUnmarkAttemptedDelivery(String)is invoked (typically on reconnect), the mark will be unset, but the delivery count will stay in place - this is to be able to abort delivery attempts if there is something wrong with the message.- Specified by:
outboxMessagesAttemptedDeliveryin interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds- which messages failed delivery.- Throws:
ClusterStoreAndForward.DataAccessException
-
outboxMessagesUnmarkAttemptedDelivery
public void outboxMessagesUnmarkAttemptedDelivery(String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardWhen this method is invoked,ClusterStoreAndForward.getMessagesFromOutbox(String, int)will again return messages that has previously been marked as attempted delivered withClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection). Notice that theClusterStoreAndForward.StoredOutMessage.getDeliveryCount()will not be reset, but theClusterStoreAndForward.StoredOutMessage.getAttemptTimestamp()will now again return null.- Specified by:
outboxMessagesUnmarkAttemptedDeliveryin interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the matsSocketSessionId whose messages now shall be attempted.- Throws:
ClusterStoreAndForward.DataAccessException
-
outboxMessagesComplete
public void outboxMessagesComplete(String matsSocketSessionId, Collection<String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardStates that the messages are delivered. Will typically delete the message.- Specified by:
outboxMessagesCompletein interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds- which messages are complete.- Throws:
ClusterStoreAndForward.DataAccessException
-
outboxMessagesDeadLetterQueue
public void outboxMessagesDeadLetterQueue(String matsSocketSessionId, Collection<String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException Description copied from interface:ClusterStoreAndForwardStates that the messages overran the accepted number of delivery attempts.- Specified by:
outboxMessagesDeadLetterQueuein interfaceClusterStoreAndForward- Parameters:
matsSocketSessionId- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds- which messages should be DLQed.- Throws:
ClusterStoreAndForward.DataAccessException
-
storeRequestCorrelation
public void storeRequestCorrelation(String matsSocketSessionId, String serverMessageId, long requestTimestamp, String replyTerminatorId, String correlationString, byte[] correlationBinary) throws ClusterStoreAndForward.DataAccessException - Specified by:
storeRequestCorrelationin interfaceClusterStoreAndForward- Throws:
ClusterStoreAndForward.DataAccessException
-
getAndDeleteRequestCorrelation
public Optional<ClusterStoreAndForward.RequestCorrelation> getAndDeleteRequestCorrelation(String matsSocketSessionId, String serverMessageId) throws ClusterStoreAndForward.DataAccessException - Specified by:
getAndDeleteRequestCorrelationin interfaceClusterStoreAndForward- Throws:
ClusterStoreAndForward.DataAccessException
-