diff --git a/.gitignore b/.gitignore index 0da2176..c892415 100644 --- a/.gitignore +++ b/.gitignore @@ -131,3 +131,5 @@ dmypy.json /samples/ConsoleSample/ConsoleSample-dev.py /samples/KleinWebAppSample/credentials-dev.json /samples/jupyter/credentials-dev.json +/samples/ConsoleSample/config-trade.json +/samples/ConsoleSample/config-quote.json diff --git a/ctrader_fix/client.py b/ctrader_fix/client.py index d542f71..b6c600e 100644 --- a/ctrader_fix/client.py +++ b/ctrader_fix/client.py @@ -2,20 +2,20 @@ from twisted.internet.endpoints import clientFromString from twisted.application.internet import ClientService -from messages import * -from factory import Factory from twisted.internet import reactor -from fixProtocol import FixProtocol from twisted.internet import reactor -import datetime +from ctrader_fix.messages import * +from ctrader_fix.factory import Factory +from ctrader_fix.fixProtocol import FixProtocol class Client(ClientService): - def __init__(self, host, port, ssl=False, retryPolicy=None, clock=None, prepareConnection=None, numberOfMessagesToSendPerSecond=5): + def __init__(self, host, port, ssl=False, delimiter = "", retryPolicy=None, clock=None, prepareConnection=None, numberOfMessagesToSendPerSecond=5): self._runningReactor = reactor self.numberOfMessagesToSendPerSecond = numberOfMessagesToSendPerSecond + self.delimiter = delimiter endpoint = clientFromString(self._runningReactor, f"ssl:{host}:{port}" if ssl else f"tcp:{host}:{port}") - factory = Factory.forProtocol(FixProtocol, client=self) - super().__init__(endpoint, factory, retryPolicy=retryPolicy, clock=clock, prepareConnection=prepareConnection) + self._factory = Factory.forProtocol(FixProtocol, client=self) + super().__init__(endpoint, self._factory, retryPolicy=retryPolicy, clock=clock, prepareConnection=prepareConnection) self._events = dict() self._responseDeferreds = dict() self.isConnected = False @@ -45,10 +45,17 @@ def _received(self, responseMessage): self._messageReceivedCallback(self, responseMessage) def send(self, requestMessage): + requestMessage.delimiter = self.delimiter diferred = self.whenConnected(failAfterFailures=1) diferred.addCallback(lambda protocol: protocol.send(requestMessage)) return diferred + def changeMessageSequenceNumber(self, newMessageSequenceNumber): + self._factory.messageSequenceNumber = newMessageSequenceNumber + + def getMessageSequenceNumber(self): + return self._factory.messageSequenceNumber + def setConnectedCallback(self, callback): self._connectedCallback = callback @@ -57,30 +64,3 @@ def setDisconnectedCallback(self, callback): def setMessageReceivedCallback(self, callback): self._messageReceivedCallback = callback - - -if __name__ == "__main__": - client = Client("h51.p.ctrader.com", 5202) - quoteSessionInfo = {"Username": 3279203, "Password": 3279203, "BeginString": "FIX.4.4", "SenderCompID": "demo.icmarkets.3279203", "SenderSubID": "QUOTE", "TargetCompID": "cServer", "TargetSubID": "QUOTE", "HeartBeat": 30} - tradeSessionInfo = {"Username": 3279203, "Password": 3279203, "BeginString": "FIX.4.4", "SenderCompID": "demo.icmarkets.3279203", "SenderSubID": "TRADE", "TargetCompID": "cServer", "TargetSubID": "TRADE", "HeartBeat": 30} - isMessageSent = False - - def onConnected(client): - logonMessage = LogonRequest(tradeSessionInfo) - client.send(logonMessage) - - def onMessageReceived(client, responseMessage): - print("Received: ", responseMessage.getMessage()) - global isMessageSent - if isMessageSent is True: - return - isMessageSent = True - request = OrderStatusRequest(tradeSessionInfo) - request.ClOrdID = "A" - client.send(request) - - client.setConnectedCallback(onConnected) - client.setMessageReceivedCallback(onMessageReceived) - client.startService() - - reactor.run() diff --git a/ctrader_fix/factory.py b/ctrader_fix/factory.py index 1fc8aab..289e47d 100644 --- a/ctrader_fix/factory.py +++ b/ctrader_fix/factory.py @@ -3,13 +3,15 @@ from twisted.internet.protocol import ClientFactory class Factory(ClientFactory): - def __init__(self, *args, **kwargs): - super().__init__() - self.client = kwargs['client'] - self.numberOfMessagesToSendPerSecond = self.client.numberOfMessagesToSendPerSecond - def connected(self): - self.client._connected() - def disconnected(self, reason): - self.client._disconnected(reason) - def received(self, message): - self.client._received(message) + messageSequenceNumber = 0 + def __init__(self, *args, **kwargs): + super().__init__() + self.client = kwargs['client'] + self.delimiter = self.client.delimiter + self.numberOfMessagesToSendPerSecond = self.client.numberOfMessagesToSendPerSecond + def connected(self): + self.client._connected() + def disconnected(self, reason): + self.client._disconnected(reason) + def received(self, message): + self.client._received(message) diff --git a/ctrader_fix/fixProtocol.py b/ctrader_fix/fixProtocol.py index a38e590..ca62698 100644 --- a/ctrader_fix/fixProtocol.py +++ b/ctrader_fix/fixProtocol.py @@ -1,11 +1,12 @@ #!/usr/bin/env python from twisted.internet.protocol import Protocol -from messages import ResponseMessage +from ctrader_fix.messages import ResponseMessage class FixProtocol(Protocol): - _messageSequenceNumber = 0 + _currentMessage = '' def connectionMade(self): + self.factory.messageSequenceNumber = 0 super().connectionMade() self.factory.connected() @@ -14,11 +15,14 @@ def connectionLost(self, reason): self.factory.disconnected(reason) def dataReceived(self, data): - responseMessage = ResponseMessage(data.decode("ascii")) - self.factory.received(responseMessage) + dataString = data.decode("ascii") + self._currentMessage += dataString + if f"{self.factory.delimiter}10=" in dataString and dataString.endswith(self.factory.delimiter): + responseMessage = ResponseMessage(self._currentMessage, self.factory.delimiter) + self._currentMessage = '' + self.factory.received(responseMessage) def send(self, requestMessage): - self._messageSequenceNumber += 1 - messageString = requestMessage.getMessage(self._messageSequenceNumber) - print("Sending: ", messageString) + self.factory.messageSequenceNumber += 1 + messageString = requestMessage.getMessage(self.factory.messageSequenceNumber) return self.transport.write(messageString.encode("ascii")) diff --git a/ctrader_fix/messages.py b/ctrader_fix/messages.py index b20bc8e..2bcec25 100644 --- a/ctrader_fix/messages.py +++ b/ctrader_fix/messages.py @@ -3,18 +3,21 @@ import datetime class ResponseMessage: - def __init__(self, message, delimiter = ""): + def __init__(self, message, delimiter): self._message = message.replace(delimiter, "|") - self._fields = {int(field.split("=")[0]):field.split("=")[1] for field in message.split(delimiter) if field != ""} - - def getFieldStr(self, fieldNumber): - return self._fields[fieldNumber] - - def getFieldInt(self, fieldNumber): - return int(self._fields[fieldNumber]) - - def getFieldFloat(self, fieldNumber): - return float(self._fields[fieldNumber]) + self._fields = [(int(field.split("=")[0]), field.split("=")[1]) for field in message.split(delimiter) if field != "" and "=" in field] + + def getFieldValue(self, fieldNumber): + result = [] + for field in self._fields: + if field[0] == fieldNumber: + result.append(field[1]) + lenResult = len(result) + if lenResult == 0: + return None + elif lenResult == 1: + return result[0] + return result def getMessageType(self): return self._fields[35] @@ -23,21 +26,20 @@ def getMessage(self): return self._message class RequestMessage: - def __init__(self, messageType, sessionInfo, delimiter = ""): + def __init__(self, messageType, config): self._type = messageType - self._sessionInfo = sessionInfo - self._delimiter = delimiter + self._config = config def getMessage(self, sequenceNumber): body = self._getBody() if body is not None: header = self._getHeader(len(body), sequenceNumber) - headerAndBody = f"{header}{self._delimiter}{body}{self._delimiter}" + headerAndBody = f"{header}{self.delimiter}{body}{self.delimiter}" else: header = self._getHeader(0, sequenceNumber) - headerAndBody = "{header}{self._delimiter}" + headerAndBody = f"{header}{self.delimiter}" trailer = self._getTrailer(headerAndBody) - return f"{headerAndBody}{trailer}{self._delimiter}" + return f"{headerAndBody}{trailer}{self.delimiter}" def _getBody(self): return None @@ -45,14 +47,14 @@ def _getBody(self): def _getHeader(self, lenBody, sequenceNumber): fields = [] fields.append(f"35={self._type}") - fields.append(f"49={self._sessionInfo['SenderCompID']}") - fields.append(f"56={self._sessionInfo['TargetCompID']}") - fields.append(f"57={self._sessionInfo['TargetSubID']}") - fields.append(f"50={self._sessionInfo['SenderSubID']}") + fields.append(f"49={self._config['SenderCompID']}") + fields.append(f"56={self._config['TargetCompID']}") + fields.append(f"57={self._config['TargetSubID']}") + fields.append(f"50={self._config['SenderSubID']}") fields.append(f"34={sequenceNumber}") fields.append(f"52={datetime.datetime.utcnow().strftime('%Y%m%d-%H:%M:%S')}") - fieldsJoined = self._delimiter.join(fields) - return f"8={self._sessionInfo['BeginString']}{self._delimiter}9={lenBody+len(fieldsJoined) + 2}{self._delimiter}{fieldsJoined}" + fieldsJoined = self.delimiter.join(fields) + return f"8={self._config['BeginString']}{self.delimiter}9={lenBody+len(fieldsJoined) + 2}{self.delimiter}{fieldsJoined}" def _getTrailer(self, headerAndBody): messageBytes = bytes(headerAndBody, "ascii") @@ -63,24 +65,24 @@ def _getTrailer(self, headerAndBody): return f"10={str(checksum).zfill(3)}" class LogonRequest(RequestMessage): - def __init__(self, sessionInfo, delimiter = ""): - super().__init__("A", sessionInfo, delimiter) + def __init__(self, config): + super().__init__("A", config) self.EncryptionScheme = 0 def _getBody(self): fields = [] fields.append(f"98={self.EncryptionScheme}") - fields.append(f"108={self._sessionInfo['HeartBeat']}") + fields.append(f"108={self._config['HeartBeat']}") if hasattr(self, "ResetSeqNum") and self.ResetSeqNum: fields.append(f"141=Y") - fields.append(f"553={self._sessionInfo['Username']}") - fields.append(f"554={self._sessionInfo['Password']}") - return f"{self._delimiter.join(fields)}" + fields.append(f"553={self._config['Username']}") + fields.append(f"554={self._config['Password']}") + return f"{self.delimiter.join(fields)}" class Heartbeat(RequestMessage): - def __init__(self, sessionInfo, delimiter = ""): - super().__init__("0", sessionInfo, delimiter) + def __init__(self, config): + super().__init__("0", config) def _getBody(self): if hasattr(self, "TestReqId") is False: @@ -88,40 +90,40 @@ def _getBody(self): return f"112={self.TestReqId}" class TestRequest(RequestMessage): - def __init__(self, sessionInfo, delimiter = ""): - super().__init__("1", sessionInfo, delimiter) + def __init__(self, config): + super().__init__("1", config) def _getBody(self): return f"112={self.TestReqId}" class LogoutRequest(RequestMessage): - def __init__(self, sessionInfo, delimiter = ""): - super().__init__("5", sessionInfo, delimiter) + def __init__(self, config): + super().__init__("5", config) class ResendRequest(RequestMessage): - def __init__(self, sessionInfo, delimiter = ""): - super().__init__("2", sessionInfo, delimiter) + def __init__(self, config): + super().__init__("2", config) def _getBody(self): fields = [] fields.append(f"7={self.BeginSeqNo}") fields.append(f"16={self.EndSeqNo}") - return f"{self._delimiter.join(fields)}" + return f"{self.delimiter.join(fields)}" class SequenceReset(RequestMessage): - def __init__(self, sessionInfo, delimiter = ""): - super().__init__("4", sessionInfo, delimiter) + def __init__(self, config): + super().__init__("4", config) def _getBody(self): fields = [] if hasattr(self, "GapFillFlag"): fields.append(f"123={self.GapFillFlag}") fields.append(f"36={self.NewSeqNo}") - return f"{self._delimiter.join(fields)}" + return f"{self.delimiter.join(fields)}" class MarketDataRequest(RequestMessage): - def __init__(self, sessionInfo, delimiter = ""): - super().__init__("V", sessionInfo, delimiter) + def __init__(self, config): + super().__init__("V", config) def _getBody(self): fields = [] @@ -134,18 +136,21 @@ def _getBody(self): fields.append(f"269={self.MDEntryType}") fields.append(f"146={self.NoRelatedSym}") fields.append(f"55={self.Symbol}") - return f"{self._delimiter.join(fields)}" + return f"{self.delimiter.join(fields)}" class NewOrderSingle(RequestMessage): - def __init__(self, sessionInfo, delimiter = ""): - super().__init__("D", sessionInfo, delimiter) + def __init__(self, config): + super().__init__("D", config) def _getBody(self): fields = [] fields.append(f"11={self.ClOrdID}") fields.append(f"55={self.Symbol}") fields.append(f"54={self.Side}") - fields.append(f"60={self.TransactTime.strftime('%Y%m%d-%H:%M:%S')}") + if hasattr(self, "TransactTime"): + fields.append(f"60={self.TransactTime.strftime('%Y%m%d-%H:%M:%S')}") + else: + fields.append(f"60={datetime.datetime.utcnow().strftime('%Y%m%d-%H:%M:%S')}") fields.append(f"38={self.OrderQty}") fields.append(f"40={self.OrdType}") if hasattr(self, "Price"): @@ -153,21 +158,86 @@ def _getBody(self): if hasattr(self, "StopPx"): fields.append(f"99={self.StopPx}") if hasattr(self, "ExpireTime"): - fields.append(f"126={self.ExpireTime.strftime('%Y%m%d-%H:%M:%S')}") + fields.append(f"126={self.ExpireTime.strftime('%Y%m%d-%H:%M:%S')}" if isinstance(self.ExpireTime, datetime.datetime) else f"126={self.ExpireTime}") if hasattr(self, "PosMaintRptID"): fields.append(f"721={self.PosMaintRptID}") if hasattr(self, "Designation"): fields.append(f"494={self.Designation}") - return f"{self._delimiter.join(fields)}" + return f"{self.delimiter.join(fields)}" class OrderStatusRequest(RequestMessage): - def __init__(self, sessionInfo, delimiter = ""): - super().__init__("H", sessionInfo, delimiter) + def __init__(self, config): + super().__init__("H", config) def _getBody(self): fields = [] fields.append(f"11={self.ClOrdID}") if hasattr(self, "Side"): fields.append(f"54={self.Side}") - return f"{self._delimiter.join(fields)}" + return f"{self.delimiter.join(fields)}" + +class OrderMassStatusRequest(RequestMessage): + def __init__(self, config): + super().__init__("AF", config) + + def _getBody(self): + fields = [] + fields.append(f"584={self.MassStatusReqID}") + fields.append(f"585={self.MassStatusReqType}") + if hasattr(self, "IssueDate"): + fields.append(f"225={self.IssueDate.strftime('%Y%m%d-%H:%M:%S')}") + return f"{self.delimiter.join(fields)}" + +class RequestForPositions(RequestMessage): + def __init__(self, config): + super().__init__("AN", config) + + def _getBody(self): + fields = [] + fields.append(f"710={self.PosReqID}") + if hasattr(self, "PosMaintRptID"): + fields.append(f"721={self.PosMaintRptID}") + return f"{self.delimiter.join(fields)}" + +class OrderCancelRequest(RequestMessage): + def __init__(self, config): + super().__init__("F", config) + + def _getBody(self): + fields = [] + fields.append(f"41={self.OrigClOrdID}") + if hasattr(self, "OrderID"): + fields.append(f"37={self.OrderID}") + fields.append(f"11={self.ClOrdID}") + return f"{self.delimiter.join(fields)}" + +class OrderCancelReplaceRequest(RequestMessage): + def __init__(self, config): + super().__init__("G", config) + + def _getBody(self): + fields = [] + fields.append(f"41={self.OrigClOrdID}") + if hasattr(self, "OrderID"): + fields.append(f"37={self.OrderID}") + fields.append(f"11={self.ClOrdID}") + fields.append(f"38={self.OrderQty}") + if hasattr(self, "Price"): + fields.append(f"44={self.Price}") + if hasattr(self, "StopPx"): + fields.append(f"99={self.StopPx}") + if hasattr(self, "ExpireTime"): + fields.append(f"126={self.ExpireTime.strftime('%Y%m%d-%H:%M:%S')}") + return f"{self.delimiter.join(fields)}" + +class SecurityListRequest(RequestMessage): + def __init__(self, config): + super().__init__("x", config) + def _getBody(self): + fields = [] + fields.append(f"320={self.SecurityReqID}") + fields.append(f"559={self.SecurityListRequestType}") + if hasattr(self, "Symbol"): + fields.append(f"55={self.Symbol}") + return f"{self.delimiter.join(fields)}" diff --git a/samples/ConsoleSample/README.md b/samples/ConsoleSample/README.md index e46ff8c..818b79d 100644 --- a/samples/ConsoleSample/README.md +++ b/samples/ConsoleSample/README.md @@ -1,6 +1,6 @@ # Console Sample -This is the console sample for Spotware OpenApiPy Python package. +This is the console sample for cTrader FIX API Python package. It uses a single thread which is the Python main execution thread for both getting user inputs and sending/receiving messages to/from API. diff --git a/samples/ConsoleSample/config.json b/samples/ConsoleSample/config.json new file mode 100644 index 0000000..50a6951 --- /dev/null +++ b/samples/ConsoleSample/config.json @@ -0,0 +1,13 @@ +{ + "Host": "", + "Port": 0, + "SSL": false, + "Username": "", + "Password": "", + "BeginString": "FIX.4.4", + "SenderCompID": "", + "SenderSubID": "QUOTE", + "TargetCompID": "cServer", + "TargetSubID": "QUOTE", + "HeartBeat": "30" +} diff --git a/samples/ConsoleSample/main.py b/samples/ConsoleSample/main.py index 0622ed9..a3f2766 100644 --- a/samples/ConsoleSample/main.py +++ b/samples/ConsoleSample/main.py @@ -1,295 +1,133 @@ #!/usr/bin/env python -from ctrader_open_api import Client, Protobuf, TcpProtocol, Auth, EndPoints -from ctrader_open_api.endpoints import EndPoints -from ctrader_open_api.messages.OpenApiCommonMessages_pb2 import * -from ctrader_open_api.messages.OpenApiCommonMessages_pb2 import * -from ctrader_open_api.messages.OpenApiMessages_pb2 import * -from ctrader_open_api.messages.OpenApiModelMessages_pb2 import * +import requests from twisted.internet import reactor from inputimeout import inputimeout, TimeoutOccurred import webbrowser import datetime - -if __name__ == "__main__": - currentAccountId = None - hostType = input("Host (Live/Demo): ") - hostType = hostType.lower() - - while hostType != "live" and hostType != "demo": - print(f"{hostType} is not a valid host type.") - hostType = input("Host (Live/Demo): ") - - appClientId = input("App Client ID: ") - appClientSecret = input("App Client Secret: ") - isTokenAvailable = input("Do you have an access token? (Y/N): ").lower() == "y" - - accessToken = None - if isTokenAvailable == False: - appRedirectUri = input("App Redirect URI: ") - auth = Auth(appClientId, appClientSecret, appRedirectUri) - authUri = auth.getAuthUri() - print(f"Please continue the authentication on your browser:\n {authUri}") - webbrowser.open_new(authUri) - print("\nThen enter the auth code that is appended to redirect URI immediatly (the code is after ?code= in URI)") - authCode = input("Auth Code: ") - token = auth.getToken(authCode) - if "accessToken" not in token: - raise KeyError(token) - print("Token: \n", token) - accessToken = token["accessToken"] - else: - accessToken = input("Access Token: ") - - client = Client(EndPoints.PROTOBUF_LIVE_HOST if hostType.lower() == "live" else EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol) - - def connected(client): # Callback for client connection - print("\nConnected") - request = ProtoOAApplicationAuthReq() - request.clientId = appClientId - request.clientSecret = appClientSecret - deferred = client.send(request) - deferred.addErrback(onError) - - def disconnected(client, reason): # Callback for client disconnection - print("\nDisconnected: ", reason) - - def onMessageReceived(client, message): # Callback for receiving all messages - if message.payloadType in [ProtoOASubscribeSpotsRes().payloadType, ProtoOAAccountLogoutRes().payloadType, ProtoHeartbeatEvent().payloadType]: - return - elif message.payloadType == ProtoOAApplicationAuthRes().payloadType: - print("API Application authorized\n") - print("Please use setAccount command to set the authorized account before sending any other command, try help for more detail\n") - print("To get account IDs use ProtoOAGetAccountListByAccessTokenReq command") - if currentAccountId is not None: - sendProtoOAAccountAuthReq() - return - elif message.payloadType == ProtoOAAccountAuthRes().payloadType: - protoOAAccountAuthRes = Protobuf.extract(message) - print(f"Account {protoOAAccountAuthRes.ctidTraderAccountId} has been authorized\n") - print("This acccount will be used for all future requests\n") - print("You can change the account by using setAccount command") - else: - print("Message received: \n", Protobuf.extract(message)) +import json +from ctrader_fix import * + +# you can have two separate config files for QUOTE and TRADE +with open("config-trade.json") as configFile: + config = json.load(configFile) + +client = Client(config["Host"], config["Port"], ssl = config["SSL"]) + +def showHelp(): + print("Commands (Parameters with an * are required)") + print("You can't use QUOTE commands if your connection and session is TRADE and vice versa") + print("Command and parameter names are case-sensitive") + print("For date and time parameters please provide the value in '%Y%m%d-%H:%M:%S' Python format") + print("To get valid values for parameters please check the cTrader FIX Engine, Rules of Engagement PDF document") + print("\nCommon Commands (You can use on both TRADE and QUOTE):") + print("* LogonRequest: ResetSeqNum") + print("* LogoutRequest") + print("* Heartbeat: TestReqId") + print("* TestRequest: *TestReqId") + print("* ResendRequest: *BeginSeqNo *EndSeqNo") + print("* SequenceReset: *NewSeqNo GapFillFlag") + print("* SecurityListRequest: *SecurityReqID *SecurityListRequestType Symbol ") + print("\n") + print("QUOTE Commands:") + print("* MarketDataRequest: *MDReqID *SubscriptionRequestType *MarketDepth *NoMDEntryTypes *MDEntryType *NoRelatedSym *Symbol MDUpdateType") + print("\n") + print("TRADE Commands:") + print("* NewOrderSingle: *ClOrdID *Symbol *Side *OrderQty *OrdType Price StopPx ExpireTime PosMaintRptID Designation") + print("* OrderStatusRequest: *ClOrdID Side") + print("* OrderMassStatusRequest: *MassStatusReqID *MassStatusReqType IssueDate") + print("* RequestForPositions: *PosReqID PosMaintRptID") + print("* OrderCancelRequest: *OrigClOrdID *ClOrdID OrderID") + print("* OrderCancelReplaceRequest: *OrigClOrdID *ClOrdID *OrderQty OrderID Price StopPx ExpireTime") + print("\n") + print("Examples") + print("LogonRequest ResetSeqNum=Y") + print("SecurityListRequest SecurityReqID=a SecurityListRequestType=0") + print("MarketDataRequest MDReqID=a SubscriptionRequestType=1 MarketDepth=0 NoMDEntryTypes=1 MDEntryType=0 NoRelatedSym=1 Symbol=1") + print("NewOrderSingle ClOrdID=a Symbol=1 Side=2 OrderQty=10000 OrdType=1 Designation=Test") + print("NewOrderSingle ClOrdID=a Symbol=1 Side=2 OrderQty=10000 OrdType=3 StopPx=1.102 ExpireTime=20220410-12:11:10.437 Designation=Test") + print("OrderStatusRequest ClOrdID=a") + print("OrderMassStatusRequest MassStatusReqID=1 MassStatusReqType=7") + print("RequestForPositions PosReqID=a") + print("OrderCancelRequest OrigClOrdID=a ClOrdID=b") + print("OrderCancelReplaceRequest OrigClOrdID=a ClOrdID=c OrderQty=20000 Price=1.102") + reactor.callLater(3, callable=executeUserCommand) + +def setParameters(request, **kwargs): + for name, value in kwargs.items(): + setattr(request, name, value) + +def send(request): + diferred = client.send(request) + diferred.addCallback(lambda _: print("\nSent: ", request.getMessage(client.getMessageSequenceNumber()).replace("", "|"))) + +def logonRequest(**kwargs): + request = LogonRequest(config) + setParameters(request, **kwargs) + send(request) + +commands = { + "LogonRequest": LogonRequest, + "LogoutRequest": LogoutRequest, + "Heartbeat": Heartbeat, + "TestRequest": TestRequest, + "ResendRequest": ResendRequest, + "SequenceReset": SequenceReset, + "SecurityListRequest": SecurityListRequest, + "MarketDataRequest": MarketDataRequest, + "NewOrderSingle": NewOrderSingle, + "OrderStatusRequest": OrderStatusRequest, + "OrderMassStatusRequest": OrderMassStatusRequest, + "RequestForPositions": RequestForPositions, + "OrderCancelRequest": OrderCancelRequest, + "OrderCancelReplaceRequest": OrderCancelReplaceRequest} + +def executeUserCommand(): + try: + print("\n") + userInput = inputimeout("Command (ex: Help): ", timeout=30) + except TimeoutOccurred: + print("Command Input Timeout") reactor.callLater(3, callable=executeUserCommand) - - def onError(failure): # Call back for errors - print("Message Error: ", failure) + return + if userInput.lower() == "help": + showHelp() + return + userInputSplit = userInput.split(" ") + if not userInputSplit: + print("Command split error: ", userInput) reactor.callLater(3, callable=executeUserCommand) - - def showHelp(): - print("Commands (Parameters with an * are required), ignore the description inside ()") - print("setAccount(For all subsequent requests this account will be used) *accountId") - print("ProtoOAVersionReq clientMsgId") - print("ProtoOAGetAccountListByAccessTokenReq clientMsgId") - print("ProtoOAAssetListReq clientMsgId") - print("ProtoOAAssetClassListReq clientMsgId") - print("ProtoOASymbolCategoryListReq clientMsgId") - print("ProtoOASymbolsListReq includeArchivedSymbols(True/False) clientMsgId") - print("ProtoOATraderReq clientMsgId") - print("ProtoOASubscribeSpotsReq *symbolId *timeInSeconds(Unsubscribes after this time) subscribeToSpotTimestamp(True/False) clientMsgId") - print("ProtoOAReconcileReq clientMsgId") - print("ProtoOAGetTrendbarsReq *weeks *period *symbolId clientMsgId") - print("ProtoOAGetTickDataReq *days *type *symbolId clientMsgId") - print("NewMarketOrder *symbolId *tradeSide *volume clientMsgId") - print("NewLimitOrder *symbolId *tradeSide *volume *price clientMsgId") - print("NewStopOrder *symbolId *tradeSide *volume *price clientMsgId") - print("ClosePosition *positionId *volume clientMsgId") - print("CancelOrder *orderId clientMsgId") - + return + command = userInputSplit[0] + parameters = {} + try: + parameters = {parameter.split("=")[0]:parameter.split("=")[1] for parameter in userInputSplit[1:]} + except: + print("Invalid parameters: ", userInput) + reactor.callLater(3, callable=executeUserCommand) + if command in commands: + request = commands[command](config) + setParameters(request, **parameters) + send(request) + else: + print("Invalid Command: ", userInput) reactor.callLater(3, callable=executeUserCommand) - def setAccount(accountId): - global currentAccountId - if currentAccountId is not None: - sendProtoOAAccountLogoutReq() - currentAccountId = int(accountId) - sendProtoOAAccountAuthReq() - - def sendProtoOAVersionReq(clientMsgId = None): - request = ProtoOAVersionReq() - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOAGetAccountListByAccessTokenReq(clientMsgId = None): - request = ProtoOAGetAccountListByAccessTokenReq() - request.accessToken = accessToken - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOAAccountLogoutReq(clientMsgId = None): - request = ProtoOAAccountLogoutReq() - request.ctidTraderAccountId = currentAccountId - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOAAccountAuthReq(clientMsgId = None): - request = ProtoOAAccountAuthReq() - request.ctidTraderAccountId = currentAccountId - request.accessToken = accessToken - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOAAssetListReq(clientMsgId = None): - request = ProtoOAAssetListReq() - request.ctidTraderAccountId = currentAccountId - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOAAssetClassListReq(clientMsgId = None): - request = ProtoOAAssetClassListReq() - request.ctidTraderAccountId = currentAccountId - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOASymbolCategoryListReq(clientMsgId = None): - request = ProtoOASymbolCategoryListReq() - request.ctidTraderAccountId = currentAccountId - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOASymbolsListReq(includeArchivedSymbols = False, clientMsgId = None): - request = ProtoOASymbolsListReq() - request.ctidTraderAccountId = currentAccountId - request.includeArchivedSymbols = includeArchivedSymbols if type(includeArchivedSymbols) is bool else bool(includeArchivedSymbols) - deferred = client.send(request) - deferred.addErrback(onError) - - def sendProtoOATraderReq(clientMsgId = None): - request = ProtoOATraderReq() - request.ctidTraderAccountId = currentAccountId - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOAUnsubscribeSpotsReq(symbolId, clientMsgId = None): - request = ProtoOAUnsubscribeSpotsReq() - request.ctidTraderAccountId = currentAccountId - request.symbolId.append(int(symbolId)) - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOASubscribeSpotsReq(symbolId, timeInSeconds, subscribeToSpotTimestamp = False, clientMsgId = None): - request = ProtoOASubscribeSpotsReq() - request.ctidTraderAccountId = currentAccountId - request.symbolId.append(int(symbolId)) - request.subscribeToSpotTimestamp = subscribeToSpotTimestamp if type(subscribeToSpotTimestamp) is bool else bool(subscribeToSpotTimestamp) - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - reactor.callLater(int(timeInSeconds), sendProtoOAUnsubscribeSpotsReq, symbolId) - - def sendProtoOAReconcileReq(clientMsgId = None): - request = ProtoOAReconcileReq() - request.ctidTraderAccountId = currentAccountId - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOAGetTrendbarsReq(weeks, period, symbolId, clientMsgId = None): - request = ProtoOAGetTrendbarsReq() - request.ctidTraderAccountId = currentAccountId - request.period = ProtoOATrendbarPeriod.Value(period) - request.fromTimestamp = int((datetime.datetime.utcnow() - datetime.timedelta(weeks=int(weeks))).timestamp()) * 1000 - request.toTimestamp = int(datetime.datetime.utcnow().timestamp()) * 1000 - request.symbolId = int(symbolId) - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOAGetTickDataReq(days, quoteType, symbolId, clientMsgId = None): - request = ProtoOAGetTickDataReq() - request.ctidTraderAccountId = currentAccountId - request.type = ProtoOAQuoteType.Value(quoteType.upper()) - request.fromTimestamp = int((datetime.datetime.utcnow() - datetime.timedelta(days=int(days))).timestamp()) * 1000 - request.toTimestamp = int(datetime.datetime.utcnow().timestamp()) * 1000 - request.symbolId = int(symbolId) - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOANewOrderReq(symbolId, orderType, tradeSide, volume, price = None, clientMsgId = None): - request = ProtoOANewOrderReq() - request.ctidTraderAccountId = currentAccountId - request.symbolId = int(symbolId) - request.orderType = ProtoOAOrderType.Value(orderType.upper()) - request.tradeSide = ProtoOATradeSide.Value(tradeSide.upper()) - request.volume = int(volume) * 100 - if request.orderType == ProtoOAOrderType.LIMIT: - request.limitPrice = float(price) - elif request.orderType == ProtoOAOrderType.STOP: - request.stopPrice = float(price) - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendNewMarketOrder(symbolId, tradeSide, volume, clientMsgId = None): - sendProtoOANewOrderReq(symbolId, "MARKET", tradeSide, volume, clientMsgId = clientMsgId) - - def sendNewLimitOrder(symbolId, tradeSide, volume, price, clientMsgId = None): - sendProtoOANewOrderReq(symbolId, "LIMIT", tradeSide, volume, price, clientMsgId) - - def sendNewStopOrder(symbolId, tradeSide, volume, price, clientMsgId = None): - sendProtoOANewOrderReq(symbolId, "STOP", tradeSide, volume, price, clientMsgId) - - def sendProtoOAClosePositionReq(positionId, volume, clientMsgId = None): - request = ProtoOAClosePositionReq() - request.ctidTraderAccountId = currentAccountId - request.positionId = int(positionId) - request.volume = int(volume) * 100 - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) - - def sendProtoOACancelOrderReq(orderId, clientMsgId = None): - request = ProtoOACancelOrderReq() - request.ctidTraderAccountId = currentAccountId - request.orderId = int(orderId) - deferred = client.send(request, clientMsgId = clientMsgId) - deferred.addErrback(onError) +def onMessageReceived(client, responseMessage): # Callback for receiving all messages + print("\nReceived: ", responseMessage.getMessage().replace("", "|")) + reactor.callLater(3, callable=executeUserCommand) - commands = { - "help": showHelp, - "setAccount": setAccount, - "ProtoOAVersionReq": sendProtoOAVersionReq, - "ProtoOAGetAccountListByAccessTokenReq": sendProtoOAGetAccountListByAccessTokenReq, - "ProtoOAAssetListReq": sendProtoOAAssetListReq, - "ProtoOAAssetClassListReq": sendProtoOAAssetClassListReq, - "ProtoOASymbolCategoryListReq": sendProtoOASymbolCategoryListReq, - "ProtoOASymbolsListReq": sendProtoOASymbolsListReq, - "ProtoOATraderReq": sendProtoOATraderReq, - "ProtoOASubscribeSpotsReq": sendProtoOASubscribeSpotsReq, - "ProtoOAReconcileReq": sendProtoOAReconcileReq, - "ProtoOAGetTrendbarsReq": sendProtoOAGetTrendbarsReq, - "ProtoOAGetTickDataReq": sendProtoOAGetTickDataReq, - "NewMarketOrder": sendNewMarketOrder, - "NewLimitOrder": sendNewLimitOrder, - "NewStopOrder": sendNewStopOrder, - "ClosePosition": sendProtoOAClosePositionReq, - "CancelOrder": sendProtoOACancelOrderReq} +def disconnected(client, reason): # Callback for client disconnection + print("\nDisconnected, reason: ", reason) - def executeUserCommand(): - try: - print("\n") - userInput = inputimeout("Command (ex help): ", timeout=18) - except TimeoutOccurred: - print("Command Input Timeout") - reactor.callLater(3, callable=executeUserCommand) - return - userInputSplit = userInput.split(" ") - if not userInputSplit: - print("Command split error: ", userInput) - reactor.callLater(3, callable=executeUserCommand) - return - command = userInputSplit[0] - try: - parameters = [parameter if parameter[0] != "*" else parameter[1:] for parameter in userInputSplit[1:]] - except: - print("Invalid parameters: ", userInput) - reactor.callLater(3, callable=executeUserCommand) - if command in commands: - commands[command](*parameters) - else: - print("Invalid Command: ", userInput) - reactor.callLater(3, callable=executeUserCommand) +def connected(client): + print("Connected") + executeUserCommand() - # Setting optional client callbacks - client.setConnectedCallback(connected) - client.setDisconnectedCallback(disconnected) - client.setMessageReceivedCallback(onMessageReceived) - # Starting the client service - client.startService() - reactor.run() +# Setting client callbacks +client.setConnectedCallback(connected) +client.setDisconnectedCallback(disconnected) +client.setMessageReceivedCallback(onMessageReceived) +# Starting the client service +client.startService() +reactor.run()