pysyncobj package

SyncObj

class pysyncobj.SyncObj(selfNode, otherNodes, conf=None, consumers=None, nodeClass=<class 'pysyncobj.node.TCPNode'>, transport=None, transportClass=<class 'pysyncobj.transport.TCPTransport'>)

Main SyncObj class, you should inherit your own class from it.

Parameters:
  • selfNode (Node or str) – object representing the self-node or address of the current node server ‘host:port’
  • otherNodes (iterable of Node or iterable of str) – objects representing the other nodes or addresses of partner nodes [‘host1:port1’, ‘host2:port2’, …]
  • conf (SyncObjConf) – configuration object
  • consumers (list of SyncObjConsumer inherited objects) – objects to be replicated
  • nodeClass (class) – class used for representation of nodes
  • transport (Transport or None) – transport object; if None, transportClass is used to initialise such an object
  • transportClass (class) – the Transport subclass to be used for transferring messages to and from other nodes
addNodeToCluster(node, callback=None)

Add single node to cluster (dynamic membership changes). Async. You should wait until node successfully added before adding next node.

Parameters:
  • node (Node | str) – node object or ‘nodeHost:nodePort’
  • callback (function(FAIL_REASON, None)) – will be called on success or fail
destroy()

Correctly destroy SyncObj. Stop autoTickThread, close connections, etc.

doTick(timeToWait=0.0)

Performs single tick. Should be called manually if autoTick disabled

Parameters:timeToWait (float) – max time to wait for next tick. If zero - perform single tick without waiting for new events. Otherwise - wait for new socket event and return.
forceLogCompaction()

Force to start log compaction (without waiting required time or required number of entries)

getStatus()

Dumps different debug info about cluster to dict and return it

isReady()

Check if current node is initially synced with others and has an actual data.

Returns:True if ready, False otherwise
Return type:bool
printStatus()

Dumps different debug info about cluster to default logger

removeNodeFromCluster(node, callback=None)

Remove single node from cluster (dynamic membership changes). Async. You should wait until node successfully added before adding next node.

Parameters:
  • node (Node | str) – node object or ‘nodeHost:nodePort’
  • callback – will be called on success or fail
setCodeVersion(newVersion, callback=None)

Switch to a new code version on all cluster nodes. You should ensure that cluster nodes are updated, otherwise they won’t be able to apply commands.

Parameters:newVersion – new code version

:type int :param callback: will be called on cussess or fail :type callback: function(FAIL_REASON, None)

waitBinded()

Waits until initialized (binded port). If success - just returns. If failed to initialized after conf.maxBindRetries - raise SyncObjException.

waitReady()

Waits until the transport is ready for operation.

Raises:TransportNotReadyError – if the transport fails to get ready

replicated

pysyncobj.replicated(*decArgs, **decKwargs)

Replicated decorator. Use it to mark your class members that modifies a class state. Function will be called asynchronously. Function accepts flowing additional parameters (optional):

‘callback’: callback(result, failReason), failReason - FAIL_REASON. ‘sync’: True - to block execution and wait for result, False - async call. If callback is passed,

‘sync’ option is ignored.
‘timeout’: if ‘sync’ is enabled, and no result is available for ‘timeout’ seconds -
SyncObjException will be raised.

These parameters are reserved and should not be used in kwargs of your replicated method.

Parameters:
  • func (function) – arbitrary class member
  • ver (int) – (optional) - code version (for zero deployment)

replicated_sync

pysyncobj.replicated_sync(*decArgs, **decKwargs)

SyncObjConf

class pysyncobj.SyncObjConf(**kwargs)

PySyncObj configuration object

appendEntriesBatchSizeBytes = None

Max number of bytes per single append_entries command.

appendEntriesPeriod = None

Interval of sending append_entries (ping) command. Should be less than raftMinTimeout.

appendEntriesUseBatch = None

Send multiple entries in a single command. Enabled (default) - improve overall performance (requests per second) Disabled - improve single request speed (don’t wait till batch ready)

autoTick = None

Disable autoTick if you want to call onTick manually. Otherwise it will be called automatically from separate thread.

bindAddress = None

Bind address (address:port). Default - None. If None - selfAddress is used as bindAddress. Could be useful if selfAddress is not equal to bindAddress. Eg. with routers, nat, port forwarding, etc.

bindRetryTime = None

Will try to bind port every bindRetryTime seconds until success.

commandsQueueSize = None

Commands queue is used to store commands before real processing.

commandsWaitLeader = None

If true - commands will be enqueued and executed after leader detected. Otherwise - FAIL_REASON.MISSING_LEADER error will be emitted. Leader is missing when esteblishing connection or when election in progress.

connectionRetryTime = None

Interval between connection attempts. Will try to connect to offline nodes each connectionRetryTime.

connectionTimeout = None

When no data received for connectionTimeout - connection considered dead. Should be more than raftMaxTimeout.

deserializer = None

Custom deserialize function, it will be called when restore from fullDump. If specified - there should be a custom serializer too. Should return data - internal stuff that was passed to serialize.

dnsCacheTime = None

Time to cache dns requests (improves performance, no need to resolve address for each connection attempt).

dnsFailCacheTime = None

Time to cache failed dns request.

dynamicMembershipChange = None

If enabled - cluster configuration could be changed dynamically.

fullDumpFile = None

File to store full serialized object. Save full dump on disc when doing log compaction. None - to disable store.

journalFile = None

File to store operations journal. Save each record as soon as received.

leaderFallbackTimeout = None

When leader has no response from the majority of the cluster for leaderFallbackTimeout - it will fallback to follower state. Should be more than appendEntriesPeriod.

logCompactionBatchSize = None

Max number of bytes per single append_entries command while sending serialized object.

logCompactionMinEntries = None

Log will be compacted after it reach minEntries size or minTime after previous compaction.

logCompactionMinTime = None

Log will be compacted after it reach minEntries size or minTime after previous compaction.

logCompactionSplit = None

If true - each node will start log compaction in separate time window. eg. node1 in 12.00-12.10, node2 in 12.10-12.20, node3 12.20 - 12.30, then again node1 12.30-12.40, node2 12.40-12.50, etc.

maxBindRetries = None

Max number of attempts to bind port (default 0, unlimited).

onCodeVersionChanged = None

This callback will be called when cluster is switched to new version. onCodeVersionChanged(oldVer, newVer)

onReady = None

This callback will be called as soon as SyncObj sync all data from leader.

onStateChanged = None

This callback will be called for every change of SyncObj state. Arguments: onStateChanged(oldState, newState). WARNING: there could be multiple leaders at the same time!

pollerType = None

Sockets poller: * auto - auto select best available on current platform * select - use select poller * poll - use poll poller

preferredAddrType = None

Preferred address type. Default - ipv4. None - no preferences, select random available. ipv4 - prefer ipv4 address type, if not available us ipv6. ipv6 - prefer ipv6 address type, if not available us ipv4.

raftMaxTimeout = None

Same as raftMinTimeout

raftMinTimeout = None

After randomly selected timeout (in range from minTimeout to maxTimeout) leader considered dead, and leader election starts.

recvBufferSize = None

Size of receive for sockets.

sendBufferSize = None

Size of send buffer for sockets.

serializeChecker = None

Check custom serialization state, for async serializer. Should return one of SERIALIZER_STATE.

serializer = None

Custom serialize function, it will be called when logCompaction (fullDump) happens. If specified - there should be a custom deserializer too. Arguments: serializer(fileName, data) data - some internal stuff that is required to be serialized with your object data.

useFork = None

Use fork if available when serializing on disk.

FAIL_REASON

class pysyncobj.FAIL_REASON
DISCARDED = 3

Command discarded (cause of new leader elected and another command was applied instead)

LEADER_CHANGED = 5

Simmilar to NOT_LEADER - leader has changed without command commit.

MISSING_LEADER = 2

Leader is currently missing (leader election in progress, or no connection)

NOT_LEADER = 4

Leader has changed, old leader did not have time to commit command.

QUEUE_FULL = 1

Commands queue full

REQUEST_DENIED = 6

Command denied

SUCCESS = 0

Command successfully applied.

SERIALIZER_STATE

class pysyncobj.SERIALIZER_STATE
FAILED = 3

Serialization failed (should be returned only one time after finished).

NOT_SERIALIZING = 0

Serialization not started or already finished.

SERIALIZING = 1

Serialization in progress.

SUCCESS = 2

Serialization successfully finished (should be returned only one time after finished).