Skip to content

fix Poloniex trade stream #738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 17, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 50 additions & 33 deletions src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -326,29 +326,31 @@ protected override async Task<IReadOnlyDictionary<string, ExchangeCurrency>> OnG

protected override async Task<IEnumerable<string>> OnGetMarketSymbolsAsync()
{
List<string> symbols = new List<string>();
var tickers = await GetTickersAsync();
foreach (var kv in tickers)
{
symbols.Add(kv.Key);
}
return symbols;
return (await GetMarketSymbolsMetadataAsync()).Where(x => x.IsActive.Value).Select(x => x.MarketSymbol);
}

protected internal override async Task<IEnumerable<ExchangeMarket>> OnGetMarketSymbolsMetadataAsync()
{
//https://poloniex.com/public?command=returnOrderBook&currencyPair=all&depth=0
/*
* "BTC_CLAM": {
"asks": [],
"bids": [],
"isFrozen": "0",
"seq": 37268918
}, ...
//https://docs.poloniex.com/#returnticker
/*
{
"BTC_BTS": {
"id": 14,
"last": "0.00000090",
"lowestAsk": "0.00000091",
"highestBid": "0.00000089",
"percentChange": "-0.02173913",
"baseVolume": "0.28698296",
"quoteVolume": "328356.84081156",
"isFrozen": "0",
"postOnly": "0",
"high24hr": "0.00000093",
"low24hr": "0.00000087"
},...
*/

var markets = new List<ExchangeMarket>();
Dictionary<string, JToken> lookup = await MakeJsonRequestAsync<Dictionary<string, JToken>>("/public?command=returnOrderBook&currencyPair=all&depth=0");
var markets = new List<ExchangeMarket>();
Dictionary<string, JToken> lookup = await MakeJsonRequestAsync<Dictionary<string, JToken>>("/public?command=returnTicker");
// StepSize is 8 decimal places for both price and amount on everything at Polo
const decimal StepSize = 0.00000001m;
const decimal minTradeSize = 0.0001m;
Expand All @@ -357,16 +359,18 @@ protected internal override async Task<IEnumerable<ExchangeMarket>> OnGetMarketS
{
var market = new ExchangeMarket { MarketSymbol = kvp.Key, IsActive = false };

string isFrozen = kvp.Value["isFrozen"].ToStringInvariant();
if (string.Equals(isFrozen, "0"))
string isFrozen = kvp.Value["isFrozen"].ToStringInvariant();
string postOnly = kvp.Value["postOnly"].ToStringInvariant();
if (string.Equals(isFrozen, "0") && string.Equals(postOnly, "0"))
{
market.IsActive = true;
}

string[] pairs = kvp.Key.Split('_');
if (pairs.Length == 2)
{
market.QuoteCurrency = pairs[0];
market.MarketId = kvp.Value["id"].ToStringLowerInvariant();
market.QuoteCurrency = pairs[0];
market.BaseCurrency = pairs[1];
market.PriceStepSize = StepSize;
market.QuantityStepSize = StepSize;
Expand Down Expand Up @@ -440,10 +444,19 @@ protected override async Task<IWebSocket> OnGetTickersWebSocketAsync(Action<IRea

protected override async Task<IWebSocket> OnGetTradesWebSocketAsync(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
Dictionary<int, Tuple<string, long>> messageIdToSymbol = new Dictionary<int, Tuple<string, long>>();
Dictionary<int, string> messageIdToSymbol = new Dictionary<int, string>();
Dictionary<string, int> symbolToMessageId = new Dictionary<string, int>();
var symMeta = await GetMarketSymbolsMetadataAsync();
foreach (var symbol in symMeta)
{
messageIdToSymbol.Add(int.Parse(symbol.MarketId), symbol.MarketSymbol);
symbolToMessageId.Add(symbol.MarketSymbol, int.Parse(symbol.MarketId));
}
return await ConnectPublicWebSocketAsync(string.Empty, async (_socket, msg) =>
{
JToken token = JToken.Parse(msg.ToStringFromUTF8());
if (token.Type == JTokenType.Object && token["error"] != null)
throw new APIException($"Exchange returned error: {token["error"].ToStringInvariant()}");
int msgId = token[0].ConvertInvariant<int>();

if (msgId == 1010 || token.Count() == 2) // "[7,2]"
Expand All @@ -459,18 +472,17 @@ protected override async Task<IWebSocket> OnGetTradesWebSocketAsync(Func<KeyValu
var dataType = data[0].ToStringInvariant();
if (dataType == "i")
{
var marketInfo = data[1];
var market = marketInfo["currencyPair"].ToStringInvariant();
messageIdToSymbol[msgId] = new Tuple<string, long>(market, 0);
// can also populate messageIdToSymbol from here
continue;
}
else if (dataType == "t")
{
if (messageIdToSymbol.TryGetValue(msgId, out Tuple<string, long> symbol))
{ // 0 1 2 3 4 5
// ["t", "<trade id>", <1 for buy 0 for sell>, "<price>", "<size>", <timestamp>]
ExchangeTrade trade = data.ParseTrade(amountKey: 4, priceKey: 3, typeKey: 2, timestampKey: 5,
timestampType: TimestampType.UnixSeconds, idKey: 1, typeKeyIsBuyValue: "1");
await callback(new KeyValuePair<string, ExchangeTrade>(symbol.Item1, trade));
if (messageIdToSymbol.TryGetValue(msgId, out string symbol))
{ // 0 1 2 3 4 5 6
// ["t", "<trade id>", <1 for buy 0 for sell>, "<price>", "<size>", <timestamp>, "<epoch_ms>"]
ExchangeTrade trade = data.ParseTrade(amountKey: 4, priceKey: 3, typeKey: 2, timestampKey: 6,
timestampType: TimestampType.UnixMilliseconds, idKey: 1, typeKeyIsBuyValue: "1");
await callback(new KeyValuePair<string, ExchangeTrade>(symbol, trade));
}
}
else if (dataType == "o")
Expand All @@ -484,14 +496,19 @@ protected override async Task<IWebSocket> OnGetTradesWebSocketAsync(Func<KeyValu
}
}, async (_socket) =>
{
IEnumerable<int> marketIDs = null;
if (marketSymbols == null || marketSymbols.Length == 0)
{
marketSymbols = (await GetMarketSymbolsAsync()).ToArray();
marketIDs = messageIdToSymbol.Keys;
}
else
{
marketIDs = marketSymbols.Select(s => symbolToMessageId[s]);
}
// subscribe to order book and trades channel for each symbol
foreach (var sym in marketSymbols)
foreach (var id in marketIDs)
{
await _socket.SendMessageAsync(new { command = "subscribe", channel = NormalizeMarketSymbol(sym) });
await _socket.SendMessageAsync(new { command = "subscribe", channel = id });
}
});
}
Expand Down