由于工作需要组了全洋垃圾E5-2696V3双路,但在使用中发现使用.net core编写的程序无法利用超过36个核心,一次只能用前36年或后36个。
通过查阅资料发现windows server会把cpu进行分组,这里被分成了2组,每组36个核心。
一个进程只能设置使用其中一个组。这就造成了只能用到一半的核心来跑程序。这双路还有什么意思,于是继续查资料,最终使用windows api解决了。
以下一资料参考:
Processor Groups – Win32 apps | Microsoft Docs
SetThreadAffinityMask function (winbase.h) – Win32 apps | Microsoft Docs
通过微软的文档可以知道要想使用超过64核心的cpu需要使用到处理器组这个概念,并且微软提供了api来进行相关操作。
下面是用.net core实现的可以使用任意数量cpu的代码:
与Parallel.ForEach使用相同,使用ThreadAffinityTools.ForEach即可让程序使用上全部的cpu核心。
最终效果如图:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace xL.Tools.Standard
{
public static class ThreadAffinityTools
{
static Dictionary<ushort, int> groups;
public static int ProcessCount;
static ThreadAffinityTools()
{
var group_count = GetActiveProcessorGroupCount();
groups = new Dictionary<ushort, int>();
for (ushort i = 0; i < group_count; i++)
{
var c = GetActiveProcessorCount(i);
groups.Add(i, c);
ProcessCount += c;
}
}
[DllImport("kernel32.dll")]
static extern IntPtr GetCurrentThread();
[DllImport("kernel32.dll")]
static extern ushort GetActiveProcessorGroupCount();
[DllImport("kernel32.dll")]
static extern int GetActiveProcessorCount(int group);
[DllImport("kernel32.dll")]
static extern bool SetThreadGroupAffinity(IntPtr handle, ref GROUP_AFFINITY affinity, ref GROUP_AFFINITY previousAffinity);
public static bool SetThreadAffinity(IntPtr hThread, int cpuid)
{
int tmp = 0;
for (ushort group = 0; group < groups.Count; group++)
{
for (int j = 0; j < groups[group]; j++)
{
if (tmp == cpuid)
{
ulong mask = 0;
mask |= 1UL << j;
var newAffinity = new GROUP_AFFINITY
{
Group = group,
Mask = new UIntPtr(mask)
};
var previousAffinity = new GROUP_AFFINITY(); ;
var r = SetThreadGroupAffinity(hThread, ref newAffinity, ref previousAffinity);
return r;
}
tmp++;
}
}
return false;
}
public static bool SetThreadAffinity(int cpuid)
{
int tmp = 0;
var id = cpuid;
while (id >= ProcessCount)
id = id - ProcessCount;
for (ushort group = 0; group < groups.Count; group++)
{
for (int j = 0; j < groups[group]; j++)
{
if (tmp == id)
{
ulong mask = 0;
mask |= 1UL << j;
var newAffinity = new GROUP_AFFINITY
{
Group = group,
Mask = new UIntPtr(mask)
};
var previousAffinity = new GROUP_AFFINITY(); ;
var r = SetThreadGroupAffinity(GetCurrentThread(), ref newAffinity, ref previousAffinity);
//Console.WriteLine($"线程{cpuid}绑定{group},{j},{mask},{r}");
return r;
}
tmp++;
}
}
Console.WriteLine($"线程{cpuid}没有找到cpu");
return false;
}
public static void Run(Action action)
{
var thlist = new Thread[ProcessCount];
var t = new Thread(() =>
{
SetThreadAffinity(ProcessCount - 1);//绑定最后一个cpu
for (int i = 0; i < ProcessCount; i++)
{
var th = new Thread(i_obj =>
{
var index = Convert.ToInt32(i_obj);
SetThreadAffinity(index);
action();
});
th.IsBackground = true;
thlist[i] = th;
th.Start(i);
}
});
t.Start();
t.Join();
foreach (var th in thlist)
{
th.Join();
}
}
public static void ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body, int th_count = 0)
{
var t_count = th_count == 0 ? ProcessCount : th_count;
var source_count = source.Count();
var queue = new System.Collections.Concurrent.ConcurrentQueue<TSource>();
foreach (var item in source.OrderBy(o => Guid.NewGuid()))
{
queue.Enqueue(item);
}
var thlist = new Thread[t_count];
var t = new Thread(() =>
{
SetThreadAffinity(ProcessCount - 1);//绑定最后一个cpu
for (int i = 0; i < t_count; i++)
{
var th = new Thread(i_obj =>
{
int index = Convert.ToInt32(i_obj);
SetThreadAffinity(index);
var obj = default(TSource);
//int index = -1;
while (queue.Count > 0)
{
while (queue.TryDequeue(out obj))
{
body(obj);
}
}
});
th.IsBackground = true;
thlist[i] = th;
th.Start(i);
}
});
t.Start();
t.Join();
foreach (var th in thlist)
{
th.Join();
}
}
public static void ForEach2<TSource>(IEnumerable<TSource> source, Action<TSource> body, int th_count = 0)
{
var t_count = th_count == 0 ? ProcessCount : th_count;
var source_count = source.Count();
var per_count = source_count / t_count + (source_count % t_count == 0 ? 0 : 1);
var thlist = new Thread[t_count];
var t = new Thread(() =>
{
SetThreadAffinity(ProcessCount - 1);//绑定最后一个cpu
for (int t_index = 0; t_index < t_count; t_index++)
{
var th = new Thread(i_obj =>
{
int index = Convert.ToInt32(i_obj);
SetThreadAffinity(index);
//var obj = default(TSource);
//int index = -1;
var tmp = source.Skip(index * per_count).Take(per_count).ToArray();
foreach (var item in tmp)
{
body(item);
}
//for (int i = 0; i < per_count; i++)
//{
// var data_index = index* per_count + i;
// if (data_index >= source_count)
// break;
// var obj = source.ElementAt(data_index);
// body(obj);
//}
});
th.Priority = ThreadPriority.Highest;
th.IsBackground = true;
thlist[t_index] = th;
th.Start(t_index);
}
});
t.Start();
t.Join();
foreach (var th in thlist)
{
th.Join();
}
}
[StructLayout(LayoutKind.Sequential, Pack = 1)]
internal struct GROUP_AFFINITY
{
public UIntPtr Mask;
public UInt16 Group;
private UInt16 Reserved0;
private UInt16 Reserved1;
private UInt16 Reserved2;
}
}
}