from typing import AsyncGenerator, Dict, List
from . import pairs
from .api import ApiProvider
from .structs import (
Candle,
DefaultPairDict,
MarketTicker,
MarketTrade,
OrderBook,
OrderInBook,
OrderSide,
Pair,
Timeframe,
)
[docs]
class Exchange:
"""Interface to base exchange methods."""
def __init__(self, api: ApiProvider = None):
self.api = api or ApiProvider(auth_required=False)
self.pairs = DefaultPairDict(
**{pair.name: pair for pair in pairs.all()}
)
[docs]
async def sync_pairs(self):
"""Use this method to sync pairs if you have issues with missing
pairs in library side."""
self.pairs = DefaultPairDict(
**{pair.name: pair for pair in (await self.get_pairs())}
)
[docs]
async def get_pairs(self) -> List[Pair]:
"""List all available market pairs and store to provide pairs info."""
data = await self.api.get("public/get-instruments")
return [
Pair(
i["symbol"],
price_precision=i["quote_decimals"],
quantity_precision=i["quantity_decimals"],
)
for i in data
if "-" not in i["symbol"] and "@" not in i["symbol"]
]
[docs]
async def get_orderbook(self, pair: Pair, depth: int = 150) -> OrderBook:
"""Get the order book for a particular market."""
data = await self.api.get(
"public/get-book",
{"instrument_name": pair.exchange_name, "depth": depth},
)
buys = [
OrderInBook.from_api(order, pair, OrderSide.BUY)
for order in data[0]["bids"]
]
sells = [
OrderInBook.from_api(order, pair, OrderSide.SELL)
for order in reversed(data[0]["asks"])
]
return OrderBook(buys, sells, pair)
async def get_candles(
self,
pair: Pair,
timeframe: Timeframe,
start_ts: int = None,
end_ts: int = None,
count: int = 1500,
include_all: bool = False,
include_last: bool = False,
) -> List[Candle]:
data = []
chunk_size = 300
result = []
chunk_start_ts = start_ts
chunk_end_ts = end_ts
prev_timestamps = set()
max_count = count if include_last else count + 1
while True:
params = {
"instrument_name": pair.exchange_name,
"timeframe": timeframe.value,
"count": chunk_size,
}
if chunk_start_ts and chunk_end_ts:
params.update(
{
"start_ts": int(chunk_start_ts * 1000),
"end_ts": int(chunk_end_ts * 1000),
}
)
data = await self.api.get("public/get-candlestick", params)
candles = (Candle.from_api(pair, candle) for candle in data)
candles = [
candle
for candle in candles
if candle.time not in prev_timestamps
]
prev_timestamps = set(candle.time for candle in candles)
result = candles + result
if (
not data
or len(data) < chunk_size
or (len(result) >= max_count and not include_all)
):
break
# NOTE: [start1, end1], [start0, end0]
size = candles[1].time - candles[0].time
if not end_ts:
chunk_end_ts = candles[0].time
chunk_start_ts = candles[0].time - size * chunk_size
if not include_last:
del result[-1]
if not include_all:
result = result[:count]
return result
[docs]
async def get_trades(self, pair: Pair) -> List[MarketTrade]:
"""Get last 200 trades in a specified market."""
data = await self.api.get(
"public/get-trades", {"instrument_name": pair.exchange_name}
)
return [MarketTrade.from_api(pair, trade) for trade in reversed(data)]
[docs]
async def get_ticker(self, pair: Pair) -> MarketTicker:
"""Get ticker in for provided pair."""
data = await self.api.get(
"public/get-tickers", {"instrument_name": pair.exchange_name}
)
return MarketTicker.from_api(pair, data[0])
[docs]
async def get_tickers(self) -> Dict[Pair, MarketTicker]:
"""Get tickers in all available markets."""
data = await self.api.get("public/get-tickers")
return {
self.pairs[ticker["i"]]: MarketTicker.from_api(
self.pairs[ticker["i"]], ticker
)
for ticker in data
if ticker["i"] in self.pairs
}
[docs]
async def get_price(self, pair: Pair) -> float:
"""Get latest price of pair."""
return (await self.get_ticker(pair)).trade_price
async def listen_candles(
self, timeframe: Timeframe, *pairs: List[Pair]
) -> AsyncGenerator[Candle, None]:
if not isinstance(timeframe, Timeframe):
raise ValueError(f"Provide Timeframe enum not {timeframe}")
channels = [
f"candlestick.{timeframe.value}.{pair.exchange_name}"
for pair in pairs
]
async for data in self.api.listen("market", *channels):
pair = self.pairs[data["instrument_name"]]
for candle in data["data"]:
yield Candle.from_api(pair, candle)
async def listen_trades(self, *pairs: List[Pair]) -> MarketTrade:
channels = [f"trade.{pair.exchange_name}" for pair in pairs]
async for data in self.api.listen("market", *channels):
for trade in data["data"]:
pair = self.pairs[data["instrument_name"]]
yield MarketTrade.from_api(pair, trade)
async def listen_orderbook(self, *pairs: List[Pair]) -> OrderBook:
channels = [f"book.{pair.exchange_name}.50" for pair in pairs]
async for data in self.api.listen("market", *channels):
pair = self.pairs[data["instrument_name"]]
buys = [
OrderInBook.from_api(order, pair, OrderSide.BUY)
for order in data["data"][0]["bids"]
]
sells = [
OrderInBook.from_api(order, pair, OrderSide.SELL)
for order in reversed(data["data"][0]["asks"])
]
yield OrderBook(buys, sells, pair)