import asyncio
import hashlib
import hmac
import json
import os
import pathlib
import random
import ssl
import time
from urllib.parse import urljoin
import aiolimiter
import async_timeout
import httpx
import websockets
RATE_LIMITS = {
# order methods
(
"private/create-order",
"private/cancel-order",
"private/cancel-all-orders",
): (14, 0.1),
# order detail methods
("private/get-order-detail",): (29, 0.1),
# general trade methods
("private/get-trades", "private/get-order-history"): (1, 1),
}
[docs]
class ApiError(Exception):
pass
class ApiAuthError(ApiError):
pass
def params_to_str(obj, level):
if isinstance(obj, str):
return obj
if level >= 3:
return str(obj)
return_str = ""
for key in sorted(obj):
return_str += key
if isinstance(obj[key], list):
for subObj in obj[key]:
return_str += str(subObj)
elif obj[key] is None:
return_str += "null"
else:
return_str += str(obj[key])
return return_str
class ApiListenAsyncIterable:
"""Listen websocket iterator."""
def __init__(self, api, ws, channels, sign):
self.api = api
self.ws = ws
self.channels = channels
self.sign = sign
self.sub_data_sent = False
self.auth_sent = False
def __aiter__(self):
return self
async def __anext__(self):
if not self.sub_data_sent:
sub_data = {
"id": random.randint(0, 2**63 - 1),
"method": "subscribe",
"params": {"channels": self.channels},
"nonce": int(time.time()),
}
# [0] sign auth request to listen private methods
if not self.auth_sent and self.sign:
await self.ws.send(json.dumps(self.api.sign("public/auth", {})))
self.auth_sent = True
# [0] if not sign start connection with subscription
if not self.sign and not self.sub_data_sent:
await self.ws.send(json.dumps(sub_data))
self.sub_data_sent = True
async with async_timeout.timeout(60) as tm:
data = await self.ws.recv()
tm.shift(60)
if data:
data = json.loads(data)
result = data.get("result")
# [1] send heartbeat to keep connection alive
if data["method"] == "public/heartbeat":
await self.ws.send(
json.dumps(
{
"id": data["id"],
"method": "public/respond-heartbeat",
}
)
)
# [3] consume data
elif self.sub_data_sent and result:
if result["subscription"] not in self.channels:
raise ApiError(
f'Wrong channel data received: {result["subscription"]} '
f'not in {self.channels}'
)
return result
# [2] subscribe to channels
if data["method"] == "public/auth" and data["code"] == 0:
await self.ws.send(json.dumps(sub_data))
self.sub_data_sent = True
elif "code" not in data or data["code"] != 0:
raise ApiAuthError(f"{data}")
[docs]
class ApiProvider:
"""Provides HTTP-api requests and websocket requests."""
def __init__(
self,
*,
api_key="",
api_secret="",
from_env=False,
auth_required=True,
timeout=5,
retries=5,
root_url="https://api.crypto.com/exchange/v1/",
ws_root_url="wss://stream.crypto.com/exchange/v1/",
logger=None,
):
self.ssl_context = httpx.create_ssl_context()
self.api_key = api_key
self.api_secret = api_secret
self.root_url = root_url
self.ws_root_url = ws_root_url
self.timeout = timeout
self.retries = retries
self.last_request_path = ""
self.rate_limiters = {}
for urls in RATE_LIMITS:
for url in urls:
self.rate_limiters[url] = aiolimiter.AsyncLimiter(
*RATE_LIMITS[urls]
)
# limits for not matched methods
self.general_private_limit = aiolimiter.AsyncLimiter(3, 0.1)
self.general_public_limit = aiolimiter.AsyncLimiter(100, 1)
if not auth_required:
return
if from_env:
self.read_keys_from_env()
if not self.api_key or not self.api_secret:
raise ValueError("Provide api_key and api_secret")
def read_keys_from_env(self):
self.api_key = os.environ.get("CRYPTOCOM_API_KEY", "")
if not self.api_key:
raise ValueError("Provide CRYPTOCOM_API_KEY env value")
self.api_secret = os.environ.get("CRYPTOCOM_API_SECRET", "")
if not self.api_secret:
raise ValueError("Provide CRYPTOCOM_API_SECRET env value")
def sign(self, path, data):
data = data or {}
data["method"] = path
sign_time = int(time.time() * 1000)
data.update({"nonce": sign_time, "api_key": self.api_key})
data["id"] = random.randint(0, 2**63 - 1)
data_params = data.get("params", {})
params = ""
if data_params:
params = params_to_str(data_params, 0)
payload = f"{path}{data['id']}{self.api_key}{params}{data['nonce']}"
data["sig"] = hmac.new(
bytes(str(self.api_secret), "utf-8"),
msg=bytes(payload, "utf-8"),
digestmod=hashlib.sha256,
).hexdigest()
return data
def get_limiter(self, path):
if path in self.rate_limiters:
return self.rate_limiters[path]
else:
if path.startswith("private"):
return self.general_private_limit
elif path.startswith("public"):
return self.general_public_limit
else:
raise ApiError(f"Wrong path: {path}")
async def request(self, method, path, params=None, data=None, sign=False):
original_data = data
limiter = self.get_limiter(path)
count = 0
while count <= self.retries:
client = httpx.AsyncClient(
timeout=httpx.Timeout(timeout=self.timeout),
verify=self.ssl_context,
)
if sign:
data = self.sign(path, original_data)
try:
async with limiter:
resp = await client.request(
method,
urljoin(self.root_url, path),
params=params,
json=data,
headers={"content-type": "application/json"},
)
resp_json = resp.json()
count += 1
if resp.status_code in [401, 400]:
raise ApiAuthError(resp_json)
elif resp.status_code != 200:
if count != self.retries:
continue
raise ApiError(
f"Error: {resp_json}. "
f"Status: {resp.status_code}. Json params: {data}"
)
except (
asyncio.TimeoutError,
httpx.HTTPError,
ssl.SSLError,
json.JSONDecodeError,
) as exc:
if count == self.retries:
raise ApiError(
f"Timeout or read error, retries: {self.retries}. "
f"Path: {path}. Data: {data}. Exc: {exc}"
) from exc
continue
finally:
await client.aclose()
if resp_json["code"] == 0:
result = resp_json.get("result", {})
if "data" in result:
result = result["data"]
if result is None:
continue
if data:
if data["id"] != resp_json["id"]:
raise ApiError(f"Not matched req = resp {resp_json}")
return result
if count == self.retries:
raise ApiError(
f"System error, retries: {self.retries}. "
f"Code: {resp.status_code}. Json: {resp_json}. "
f"Data: {data}"
)
async def get(self, path, params=None, sign=False):
return await self.request("get", path, params=params, sign=sign)
async def post(self, path, data=None, sign=True):
return await self.request("post", path, data=data, sign=sign)
async def listen(self, url, *channels, sign=False):
url = urljoin(self.ws_root_url, url)
async for ws in websockets.connect(url, open_timeout=self.timeout):
try:
dataiterator = ApiListenAsyncIterable(self, ws, channels, sign)
async for data in dataiterator:
if data:
yield data
except (websockets.ConnectionClosed, asyncio.TimeoutError):
continue
[docs]
class RecordApiProvider(ApiProvider):
"""Captures API and websocket responses into json files.
If capture=False it will read them to reproduce same responses in order."""
def __init__(
self,
*,
cache_file: pathlib.Path,
capture: bool = False,
divide_delay: int = 1,
fake_account_id: bool = True,
):
self.capture = capture
self.cache_file = cache_file
self.divide_delay = divide_delay
# TODO: implement auto-replacement
self.fake_account_id = fake_account_id
if self.capture:
self.cache_file.parent.mkdir(exist_ok=True, parents=True)
# TODO: implement correct overwrite
# if self.cache_file.exists():
# self.cache_file.unlink()
# self.cache_file.touch()
if self.cache_file.exists():
self.records = json.loads(self.cache_file.read_text())
else:
self.records = {}
kwargs = {"from_env": capture}
if not self.capture:
kwargs["api_key"] = "dummy"
kwargs["api_secret"] = "dummy"
super().__init__(**kwargs)
async def request(self, method, path, params=None, data=None, sign=False):
key = f"{method}_{path}"
args_data = data or params or {}
args_data = sorted(args_data.items(), key=lambda v: v[0])
args = ",".join(f"{key}={value}" for key, value in args_data)
if self.capture:
timestamp = time.time()
response = await super().request(
method, path, params=params, data=data, sign=sign
)
self.records.setdefault(key, {}).setdefault(args, []).append(
{
"response": response,
"exec_time": time.time() - timestamp,
"timestamp": timestamp,
}
)
else:
try:
record = self.records[key][args].pop(0)
except KeyError:
raise ApiError("No path found")
await asyncio.sleep(record["exec_time"] / self.divide_delay)
response = record["response"]
return response
async def listen(self, url, *channels, sign=False):
key = "ws"
args = ",".join(channels)
if self.capture:
timestamp = time.time()
async for response in super().listen(url, *channels, sign=sign):
self.records.setdefault(key, {}).setdefault(args, []).append(
{
"response": response,
"exec_time": time.time() - timestamp,
"timestamp": timestamp,
}
)
timestamp = time.time()
yield response
else:
for record in self.records[key][args]:
await asyncio.sleep(record["exec_time"] / self.divide_delay)
yield record["response"]
def save(self):
if self.capture:
for key, args in self.records.items():
for arg, data in args.items():
args[arg] = sorted(data, key=lambda v: v["timestamp"])
self.cache_file.write_text(json.dumps(self.records, indent=2))