我不明白为什么,但是在客户端库中似乎没有机制为
Windows Azure表存储并行执行许多查询.我创建了一个可以用来节省大量时间的模板类,欢迎你使用它.不过,如果你能把它分开,并提供关于如何改进这个课程的反馈意见,我将不胜感激.
public class AsyncDataQuery<T> where T: new()
{
public AsyncDataQuery(bool preserve_order)
{
m_preserve_order = preserve_order;
this.Queries = new List<CloudTableQuery<T>>(1000);
}
public void AddQuery(IQueryable<T> query)
{
var data_query = (DataServiceQuery<T>)query;
var uri = data_query.RequestUri; // required
this.Queries.Add(new CloudTableQuery<T>(data_query));
}
/// <summary>
/// Blocking but still optimized.
/// </summary>
public List<T> Execute()
{
this.BeginAsync();
return this.EndAsync();
}
public void BeginAsync()
{
if (m_preserve_order == true)
{
this.Items = new List<T>(Queries.Count);
for (var i = 0; i < Queries.Count; i++)
{
this.Items.Add(new T());
}
}
else
{
this.Items = new List<T>(Queries.Count * 2);
}
m_wait = new ManualResetEvent(false);
for (var i = 0; i < Queries.Count; i++)
{
var query = Queries[i];
query.BeginExecuteSegmented(callback,i);
}
}
public List<T> EndAsync()
{
m_wait.WaitOne();
m_wait.dispose();
return this.Items;
}
private List<T> Items { get; set; }
private List<CloudTableQuery<T>> Queries { get; set; }
private bool m_preserve_order;
private ManualResetEvent m_wait;
private int m_completed = 0;
private object m_lock = new object();
private void callback(IAsyncResult ar)
{
int i = (int)ar.AsyncState;
CloudTableQuery<T> query = Queries[i];
var response = query.EndExecuteSegmented(ar);
if (m_preserve_order == true)
{ // preserve ordering only supports one result per query
lock (m_lock)
{
this.Items[i] = response.Results.Single();
}
}
else
{ // add any number of items
lock (m_lock)
{
this.Items.AddRange(response.Results);
}
}
if (response.HasMoreResults == true)
{ // more data to pull
query.BeginExecuteSegmented(response.ContinuationToken,callback,i);
return;
}
m_completed = Interlocked.Increment(ref m_completed);
if (m_completed == Queries.Count)
{
m_wait.Set();
}
}
}
解决方法
猜猜我迟到了.我会添加两件事情:
> ManualResetEvent是Idisposable.所以你需要确保它处于某个地方.
>错误处理 – 如果一个查询失败,它可能会失败的整个事情.您应该重试失败的请求.或者,您可以返回您回复的值,并显示哪些查询失败,以便调用者可以重试查询.
>客户端超时 – 没有.如果服务器端超时,这不是一个问题,但是如果这样做永远失败(例如,网络问题),客户端将永久挂起.
此外,我认为这实际上是一个更好的方法,任务并行库.在此之前,我尝试了每个查询任务的方法.代码实际上更尴尬,而且往往导致了很多活动线程.我仍然没有用你的代码进行广泛的测试,但是在第一次脸红的时候似乎更好.
更新
我已经把一些工作做了一个或多或少的重写上面的代码.我的重写消除了所有的锁定,支持客户端超时的挂起事务(很少,但它确实发生,真的可以毁了你的一天)和一些异常处理逻辑.有一个完整的解决方案,测试在Bitbucket.最相关的代码生活在one file,虽然它需要一些帮助者在项目的其他部分.