public class ClusterStoreAndForward_SQL extends java.lang.Object implements ClusterStoreAndForward
ClusterStoreAndForward_SQL_DbMigrations
.
NOTE: There is heavy reliance on the Mats' MatsFactory.ContextLocal
feature whereby the current Mats
StageProcessor-contextual SQL Connection is available using the MatsFactory.ContextLocal.getAttribute(Class, String...)
method. This since several of the methods on this interface will be invoked within a Mats initiation or Stage process
lambda, and thus participating in the transactional demarcation established there is vital to achieve the guaranteed
delivery and exactly-once delivery semantics.ClusterStoreAndForward.CurrentNode, ClusterStoreAndForward.DataAccessException, ClusterStoreAndForward.MessageIdAlreadyExistsException, ClusterStoreAndForward.RequestCorrelation, ClusterStoreAndForward.SimpleCurrentNode, ClusterStoreAndForward.SimpleRequestCorrelation, ClusterStoreAndForward.SimpleStoredInMessage, ClusterStoreAndForward.SimpleStoredOutMessage, ClusterStoreAndForward.StoredInMessage, ClusterStoreAndForward.StoredOutMessage, ClusterStoreAndForward.WrongUserException
Modifier | Constructor and Description |
---|---|
protected |
ClusterStoreAndForward_SQL(javax.sql.DataSource dataSource,
java.lang.String nodename,
java.time.Clock clock) |
Modifier and Type | Method and Description |
---|---|
void |
boot()
Start the
ClusterStoreAndForward , perform DB preparations and migrations. |
void |
closeSession(java.lang.String matsSocketSessionId)
Invoked when the client explicitly tells us that he closed this session, CLOSE_SESSION.
|
static ClusterStoreAndForward_SQL |
create(javax.sql.DataSource dataSource,
java.lang.String nodename) |
void |
deleteMessageIdsFromInbox(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> clientMessageIds)
Deletes the incoming message Ids, as we've established that the client will never try to send this particular
message again.
|
void |
deregisterSessionFromThisNode(java.lang.String matsSocketSessionId,
java.lang.String connectionId)
Deregisters a Session home when a WebSocket is closed.
|
java.util.Optional<ClusterStoreAndForward.RequestCorrelation> |
getAndDeleteRequestCorrelation(java.lang.String matsSocketSessionId,
java.lang.String serverMessageId) |
java.util.Optional<ClusterStoreAndForward.CurrentNode> |
getCurrentRegisteredNodeForSession(java.lang.String matsSocketSessionId) |
ClusterStoreAndForward.StoredInMessage |
getMessageFromInbox(java.lang.String matsSocketSessionId,
java.lang.String clientMessageId) |
java.util.List<ClusterStoreAndForward.StoredOutMessage> |
getMessagesFromOutbox(java.lang.String matsSocketSessionId,
int maxNumberOfMessages)
Fetch a set of messages, up to 'maxNumberOfMessages' - but do not include messages that have been attempted
delivered already (marked with
ClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection) ). |
java.util.List<MatsSocketServer.MatsSocketSessionDto> |
getSessions(boolean onlyActive,
java.lang.String userId,
java.lang.String appName,
java.lang.String appVersionAtOrAbove)
Direct implementation of
MatsSocketServer.getMatsSocketSessions(boolean, String, String, String) - go
read there for semantics. |
int |
getSessionsCount(boolean onlyActive,
java.lang.String userId,
java.lang.String appName,
java.lang.String appVersionAtOrAbove)
Direct implementation of
MatsSocketServer.getMatsSocketSessionsCount(boolean, String, String, String) -
go read there for semantics. |
boolean |
isSessionExists(java.lang.String matsSocketSessionId) |
void |
notifySessionLiveliness(java.util.Collection<java.lang.String> matsSocketSessionIds)
Shall be invoked on some kind of schedule (e.g.
|
void |
outboxMessagesAttemptedDelivery(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> serverMessageIds)
Marks the specified messages as attempted delivered and notches the
ClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
one up. |
void |
outboxMessagesComplete(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> serverMessageIds)
States that the messages are delivered.
|
void |
outboxMessagesDeadLetterQueue(java.lang.String matsSocketSessionId,
java.util.Collection<java.lang.String> serverMessageIds)
States that the messages overran the accepted number of delivery attempts.
|
void |
outboxMessagesUnmarkAttemptedDelivery(java.lang.String matsSocketSessionId)
When this method is invoked,
ClusterStoreAndForward.getMessagesFromOutbox(String, int) will again return messages that has
previously been marked as attempted delivered with ClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection) . |
long |
registerSessionAtThisNode(java.lang.String matsSocketSessionId,
java.lang.String userId,
java.lang.String connectionId,
java.lang.String clientLibAndVersions,
java.lang.String appName,
java.lang.String appVersion)
Registers a Session home to this node - only one node can ever be home, so any old is deleted.
|
int |
scavengeSessionRemnants()
Shall be invoked on some kind of schedule (e.g.
|
void |
storeMessageIdInInbox(java.lang.String matsSocketSessionId,
java.lang.String clientMessageId)
Stores the incoming message Id, to avoid double delivery.
|
java.util.Optional<ClusterStoreAndForward.CurrentNode> |
storeMessageInOutbox(java.lang.String matsSocketSessionId,
java.lang.String serverMessageId,
java.lang.String clientMessageId,
java.lang.String traceId,
MatsSocketServer.MessageType type,
java.lang.Long requestTimestamp,
java.lang.String envelope,
java.lang.String messageJson,
byte[] messageBinary)
Stores the message for the Session, returning the nodename for the node holding the session, if any.
|
void |
storeRequestCorrelation(java.lang.String matsSocketSessionId,
java.lang.String serverMessageId,
long requestTimestamp,
java.lang.String replyTerminatorId,
java.lang.String correlationString,
byte[] correlationBinary) |
java.util.Collection<java.lang.String> |
timeoutSessions(long notLiveSinceTimestamp)
Shall be invoked on some kind of schedule (e.g.
|
void |
updateMessageInInbox(java.lang.String matsSocketSessionId,
java.lang.String clientMessageId,
java.lang.String envelopeWithMessage,
byte[] messageBinary)
Stores the resulting message (envelope and binary), so that if the incoming messages comes again (based on
ClusterStoreAndForward.storeMessageIdInInbox(String, String) throwing ClusterStoreAndForward.MessageIdAlreadyExistsException ), the result from
the previous processing can be returned right away. |
protected ClusterStoreAndForward_SQL(javax.sql.DataSource dataSource, java.lang.String nodename, java.time.Clock clock)
public static ClusterStoreAndForward_SQL create(javax.sql.DataSource dataSource, java.lang.String nodename)
public void boot()
ClusterStoreAndForward
ClusterStoreAndForward
, perform DB preparations and migrations.boot
in interface ClusterStoreAndForward
public long registerSessionAtThisNode(java.lang.String matsSocketSessionId, java.lang.String userId, java.lang.String connectionId, java.lang.String clientLibAndVersions, java.lang.String appName, java.lang.String appVersion) throws ClusterStoreAndForward.WrongUserException, ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
ClusterStoreAndForward.WrongUserException
is thrown if this does not match.registerSessionAtThisNode
in interface ClusterStoreAndForward
matsSocketSessionId
- the SessionId for this connection.userId
- the UserId, as provided by AuthenticationPlugin
, that own this MatsSocketSessionIdconnectionId
- an id that is unique for this specific WebSocket Session (i.e. TCP Connection), so that if it closes,
a new registration will not be deregistered by the old MatsSocketSession realizing that it is closed
and then invoking ClusterStoreAndForward.deregisterSessionFromThisNode(String, String)
clientLibAndVersions
- the Client Library and Versions + runtime information for the Client.appName
- the AppName of the accessing Client appappVersion
- the AppVersion of the accessing Client appClusterStoreAndForward.WrongUserException
- if the userId provided does not match the original userId that created the session.ClusterStoreAndForward.DataAccessException
- if problems with underlying data store.public void deregisterSessionFromThisNode(java.lang.String matsSocketSessionId, java.lang.String connectionId) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
deregisterSessionFromThisNode
in interface ClusterStoreAndForward
matsSocketSessionId
- the MatsSocketSessionId for which to deregister the specific WebSocket Session's ConnectionId.connectionId
- an id that is unique for this specific WebSocket Session (i.e. TCP Connection), so that if it closes,
a new registration will not be deregistered by the old MatsSocketSession realizing that it is closed
and then invoking ClusterStoreAndForward.deregisterSessionFromThisNode(String, String)
ClusterStoreAndForward.DataAccessException
public java.util.Optional<ClusterStoreAndForward.CurrentNode> getCurrentRegisteredNodeForSession(java.lang.String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException
getCurrentRegisteredNodeForSession
in interface ClusterStoreAndForward
matsSocketSessionId
- the MatsSocketSessionId for which to find current session home.ClusterStoreAndForward.DataAccessException
public boolean isSessionExists(java.lang.String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException
isSessionExists
in interface ClusterStoreAndForward
matsSocketSessionId
- the MatsSocketSessionId for which to check whether there is a sessionClusterStoreAndForward.DataAccessException
public java.util.List<MatsSocketServer.MatsSocketSessionDto> getSessions(boolean onlyActive, java.lang.String userId, java.lang.String appName, java.lang.String appVersionAtOrAbove) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
MatsSocketServer.getMatsSocketSessions(boolean, String, String, String)
- go
read there for semantics.getSessions
in interface ClusterStoreAndForward
data store
.ClusterStoreAndForward.DataAccessException
public int getSessionsCount(boolean onlyActive, java.lang.String userId, java.lang.String appName, java.lang.String appVersionAtOrAbove) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
MatsSocketServer.getMatsSocketSessionsCount(boolean, String, String, String)
-
go read there for semantics.getSessionsCount
in interface ClusterStoreAndForward
data store
.ClusterStoreAndForward.DataAccessException
public void closeSession(java.lang.String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
closeSession
in interface ClusterStoreAndForward
matsSocketSessionId
- the MatsSocketSessionId that should be closed.ClusterStoreAndForward.DataAccessException
public void notifySessionLiveliness(java.util.Collection<java.lang.String> matsSocketSessionIds) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
ClusterStoreAndForward
about which Sessions are currently live on that node. Sessions that aren't live,
will be scavenged after some time by invocation of ClusterStoreAndForward.timeoutSessions(long)
.notifySessionLiveliness
in interface ClusterStoreAndForward
matsSocketSessionIds
- which sessions are currently live on the invoking node.ClusterStoreAndForward.DataAccessException
public java.util.Collection<java.lang.String> timeoutSessions(long notLiveSinceTimestamp) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
notified about liveliness
since the supplied timestamp
(millis from epoch).timeoutSessions
in interface ClusterStoreAndForward
notLiveSinceTimestamp
- decides which sessions are too old: Sessions which have not been
notified about liveliness
since this timestamp will be
deleted.ClusterStoreAndForward.DataAccessException
public int scavengeSessionRemnants() throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
scavengeSessionRemnants
in interface ClusterStoreAndForward
ClusterStoreAndForward.DataAccessException
public void storeMessageIdInInbox(java.lang.String matsSocketSessionId, java.lang.String clientMessageId) throws ClusterStoreAndForward.MessageIdAlreadyExistsException, ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
ClusterStoreAndForward.MessageIdAlreadyExistsException
will be raised - this implies that a double delivery has occurred.storeMessageIdInInbox
in interface ClusterStoreAndForward
matsSocketSessionId
- the MatsSocketSessionId for which to store the incoming message id.clientMessageId
- the client's message Id for the incoming message.ClusterStoreAndForward.MessageIdAlreadyExistsException
- if the Client MessageId (cmid) already existed for this SessionId.ClusterStoreAndForward.DataAccessException
public void updateMessageInInbox(java.lang.String matsSocketSessionId, java.lang.String clientMessageId, java.lang.String envelopeWithMessage, byte[] messageBinary) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
ClusterStoreAndForward.storeMessageIdInInbox(String, String)
throwing ClusterStoreAndForward.MessageIdAlreadyExistsException
), the result from
the previous processing can be returned right away.updateMessageInInbox
in interface ClusterStoreAndForward
ClusterStoreAndForward.DataAccessException
public ClusterStoreAndForward.StoredInMessage getMessageFromInbox(java.lang.String matsSocketSessionId, java.lang.String clientMessageId) throws ClusterStoreAndForward.DataAccessException
getMessageFromInbox
in interface ClusterStoreAndForward
ClusterStoreAndForward.DataAccessException
public void deleteMessageIdsFromInbox(java.lang.String matsSocketSessionId, java.util.Collection<java.lang.String> clientMessageIds) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
deleteMessageIdsFromInbox
in interface ClusterStoreAndForward
matsSocketSessionId
- the MatsSocketSessionId for which to delete the incoming message id.clientMessageIds
- the client's message Ids for the incoming messages to delete.ClusterStoreAndForward.DataAccessException
public java.util.Optional<ClusterStoreAndForward.CurrentNode> storeMessageInOutbox(java.lang.String matsSocketSessionId, java.lang.String serverMessageId, java.lang.String clientMessageId, java.lang.String traceId, MatsSocketServer.MessageType type, java.lang.Long requestTimestamp, java.lang.String envelope, java.lang.String messageJson, byte[] messageBinary) throws ClusterStoreAndForward.DataAccessException, ClusterStoreAndForward.MessageIdAlreadyExistsException
ClusterStoreAndForward
ClusterStoreAndForward.MessageIdAlreadyExistsException
if unique constraint fails. In this case, a new Server MessageId
should be picked and try again.storeMessageInOutbox
in interface ClusterStoreAndForward
matsSocketSessionId
- the matsSocketSessionId that the message is meant for.serverMessageId
- unique id for outbox'ed message within the MatsSocketSessionId. Must be long enough that it is
extremely unlikely that is collides with another message within the same
MatsSocketSessionId - but do remember that there will at any one time be pretty few messages in the
outbox for a given MatsSocketSession.clientMessageId
- For Client Replies to requests from Server-to-Client, this is the Client's Message Id, which we
need to send back with the Reply so that the Client can correlate the Request. (Nullable, since not
all messages are replies).traceId
- the server-side traceId for this message.type
- the type of the outgoing message (RESOLVE, REJECT or PUB).requestTimestamp
- For Client Replies to requests from Server-to-Client, this is the timestamp we received the
Request.envelope
- the JSON-serialized MatsSocket Envelope without the 'msg' field set.messageJson
- the JSON-serialized message - the piece that sits in the 'msg' field of the Envelope. (Nullable)messageBinary
- the binary part of an outgoing message. (Nullable)ClusterStoreAndForward.MessageIdAlreadyExistsException
- if the Server MessageId (smid) already existed for this SessionId.ClusterStoreAndForward.DataAccessException
public java.util.List<ClusterStoreAndForward.StoredOutMessage> getMessagesFromOutbox(java.lang.String matsSocketSessionId, int maxNumberOfMessages) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
ClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection)
). If
ClusterStoreAndForward.outboxMessagesUnmarkAttemptedDelivery(String)
is invoked, that mark will be unset.getMessagesFromOutbox
in interface ClusterStoreAndForward
matsSocketSessionId
- the matsSocketSessionId that the message is meant for.maxNumberOfMessages
- the maximum number of messages to fetch.ClusterStoreAndForward.DataAccessException
public void outboxMessagesAttemptedDelivery(java.lang.String matsSocketSessionId, java.util.Collection<java.lang.String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
ClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
one up. If ClusterStoreAndForward.outboxMessagesUnmarkAttemptedDelivery(String)
is invoked (typically on reconnect), the mark
will be unset, but the delivery count will stay in place - this is to be able to abort delivery attempts if there
is something wrong with the message.outboxMessagesAttemptedDelivery
in interface ClusterStoreAndForward
matsSocketSessionId
- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds
- which messages failed delivery.ClusterStoreAndForward.DataAccessException
public void outboxMessagesUnmarkAttemptedDelivery(java.lang.String matsSocketSessionId) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
ClusterStoreAndForward.getMessagesFromOutbox(String, int)
will again return messages that has
previously been marked as attempted delivered with ClusterStoreAndForward.outboxMessagesAttemptedDelivery(String, Collection)
.
Notice that the ClusterStoreAndForward.StoredOutMessage.getDeliveryCount()
will not be reset, but the
ClusterStoreAndForward.StoredOutMessage.getAttemptTimestamp()
will now again return null.outboxMessagesUnmarkAttemptedDelivery
in interface ClusterStoreAndForward
matsSocketSessionId
- the matsSocketSessionId whose messages now shall be attempted.ClusterStoreAndForward.DataAccessException
public void outboxMessagesComplete(java.lang.String matsSocketSessionId, java.util.Collection<java.lang.String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
outboxMessagesComplete
in interface ClusterStoreAndForward
matsSocketSessionId
- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds
- which messages are complete.ClusterStoreAndForward.DataAccessException
public void outboxMessagesDeadLetterQueue(java.lang.String matsSocketSessionId, java.util.Collection<java.lang.String> serverMessageIds) throws ClusterStoreAndForward.DataAccessException
ClusterStoreAndForward
outboxMessagesDeadLetterQueue
in interface ClusterStoreAndForward
matsSocketSessionId
- the matsSocketSessionId that the serverMessageIds refers to.serverMessageIds
- which messages should be DLQed.ClusterStoreAndForward.DataAccessException
public void storeRequestCorrelation(java.lang.String matsSocketSessionId, java.lang.String serverMessageId, long requestTimestamp, java.lang.String replyTerminatorId, java.lang.String correlationString, byte[] correlationBinary) throws ClusterStoreAndForward.DataAccessException
storeRequestCorrelation
in interface ClusterStoreAndForward
ClusterStoreAndForward.DataAccessException
public java.util.Optional<ClusterStoreAndForward.RequestCorrelation> getAndDeleteRequestCorrelation(java.lang.String matsSocketSessionId, java.lang.String serverMessageId) throws ClusterStoreAndForward.DataAccessException
getAndDeleteRequestCorrelation
in interface ClusterStoreAndForward
ClusterStoreAndForward.DataAccessException