Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
4.0k views
in Technique[技术] by (71.8m points)

c# - Collection was modified error while processing data from socket stream

I'm trying to capture tick level data from a Binance Aggregated data stream. As soon as the date changes, a function processes the data from the previous day. I'm using Task.Run to create another thread as new data for the current day is still streaming in and needs to be captured. However, I'm getting a 'Collection was modified enumeration operation may not execute.' in the foreach loop of the function that processes the data. I'm not sure why tradeData would be modified after it is passed to the processing function? Not sure if it matters but I'm capturing about 40 different symbols on separate threads.

private static void Start()
{
    foreach (SymbolData symbol in symbolData)
    {
         Task.Run(() => SubscribeToSymbol(symbol.symbol));
    }
}

private static void SubscribeToSymbol(string symbol)
{
    Dictionary<decimal, decimal> tradeData = new Dictionary<decimal, decimal>();
    DateTime lastTrade = DateTime.UtcNow;
    try
    {
         var socketClient = new BinanceSocketClient();
         socketClient.FuturesUsdt.SubscribeToAggregatedTradeUpdates(symbol, data =>
         {
              if (data.TradeTime.Date > lastTrade.Date)
              {
                   Task.Run(() => ProcessData(symbol, lastTrade.Date, tradeData));
                   tradeData.Clear();
              }
              lastTrade = data.TradeTime;

              if (tradeData.ContainsKey(data.Price))
              {
                   tradeData[data.Price] = tradeData[data.Price] + data.Quantity;
              }
              else
              {
                   tradeData[data.Price] = data.Quantity;
              }
         });
     }
     catch { }
}


private static void ProcessData(string symbol, DateTime date, Dictionary<decimal, decimal> tradeData)
{
     foreach (var price in tradeData)
     {
     //Error: Collection was modified enumeration operation may not execute.
     }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You're calling Task.Run repeatedly and as a result have more than one thread working with your tradeData object which is why you're seeing the collection modified exception.

I would recommend you rethink the code design so that you dont have multiple threads working with the same objects. If you absolutely must have threads working with the same objects then lock should be used:

private static object _objlock = new object();

private static void ProcessData(string symbol, DateTime date, Dictionary<decimal, decimal> tradeData)
{
    lock (_objlock)
    {
        foreach (var price in tradeData)
        {
     
        }
     }
}

Just be careful with deadlocks when using lock. I'd recommend reading up on locks and multi threading in C#.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...