memcachedclient client library enyimmemcachedclient 哪个好

EnyimMemcached - 1.2.0.2
Project Home: /
下载binary测试时发生错误,下载源码编译后没有再遇到
Configurations:
&configSections&
&sectionGroup&name=""&
&&&section&name="memcached"&type="Enyim.Caching.Configuration.MemcachedClientSection,&Enyim.Caching"&/&
&/sectionGroup&
&section&name="memcached"&type="Enyim.Caching.Configuration.MemcachedClientSection,&Enyim.Caching"&/&
&/configSections&
&memcached&
&&&servers&
&&&&&!--&put&your&own&server(s)&here--&
&&&&&add&address="127.0.0.1"&port="11211"&/&
&&&/servers&
&&&socketPool&minPoolSize="10"&maxPoolSize="100"&connectionTimeout="00:00:10"&deadTimeout="00:02:00"&/&
&/memcached&
&memcached&keyTransformer="Enyim.Caching.TigerHashTransformer,&Enyim.Caching"&
&&&add&address="127.0.0.1"&port="11211"&/&
&/servers&
&socketPool&minPoolSize="2"&maxPoolSize="100"&connectionTimeout="00:00:10"&deadTimeout="00:02:00"&/&
&/memcached&
Basic examples: get, set, expiration
MemcachedClient&mc&=&new&MemcachedClient();
mc.Store(StoreMode.Set,&"key_1",&"A".PadRight(20,&'A'));&//no expiration time
DateTime&expireAt&=&DateTime.Now.AddMinutes(0.5);&
mc.Store(StoreMode.Set,&"key_2",&"B".PadRight(20,&'B'),&expireAt);&//expired after 30s
mc.Store(StoreMode.Set,&"key_3",&"C".PadRight(20,&'C'),&new&TimeSpan(0,&0,&15));&//expired after 15s
Console.WriteLine("{0}:",&DateTime.Now.ToString("HH:mm:ss&fff"));
Console.WriteLine("\tkey_1:&{0}\tno expiration time",&mc.Get&string&("key_1"));
Console.WriteLine("\tkey_2:&{0}\texpired at {1}",&mc.Get&string&("key_2"),&expireAt.ToString("HH:mm:ss&fff"));
Console.WriteLine("\tkey_3:&{0}\texpired after 15s",&mc.Get&string&("key_3"));
Thread.Sleep(18&*&1000);&//make the thread sleep for 18s, key_3 should expired
Console.WriteLine("{0}: sleep 18s",&DateTime.Now.ToString("HH:mm:ss&fff"));
Console.WriteLine("\tkey_1:&{0}",&mc.Get&string&("key_1"));
Console.WriteLine("\tkey_2:&{0}",&mc.Get&string&("key_2"));
Console.WriteLine("\tkey_3:&{0}",&mc.Get&string&("key_3"));
mc.Store(StoreMode.Add,&"key_1",&"X".PadRight(20,&'X'));
mc.Store(StoreMode.Add,&"key_2",&"Y".PadRight(20,&'Y'));
mc.Store(StoreMode.Add,&"key_3",&"Z".PadRight(20,&'Z'));
Console.WriteLine("{0}: try to change values by using StoreMode.Add",&DateTime.Now.ToString("HH:mm:ss&fff"));
//make the thread sleep 15s, key_2 should expired and key_3 should be set a new value
Thread.Sleep(15&*&1000);
Console.WriteLine("{0}: sleep 15s",&DateTime.Now.ToString("HH:mm:ss&fff"));
Console.WriteLine("\tkey_1:&{0}",&mc.Get&string&("key_1"));
Console.WriteLine("\tkey_2:&{0}",&mc.Get&string&("key_2"));
Console.WriteLine("\tkey_3:&{0}",&mc.Get&string&("key_3"));
object get and set
public&enum&UserGender
&&&&Male&=&1,
&&&&Female&=&2,
&&&&Unspecified&=&0,
[Serializable]
public&class&User
&&&&public&int&ID&{&get;&set;&}
&&&&public&string&Name&{&get;&set;&}
&&&&public&DateTime&Birthday&{&get;&set;&}
&&&&public&UserGender&Gender&{&get;&set;&}
&&&&public&override&string&ToString()
&&&&&&&&return&new&StringBuilder()
&&&&&&&&&&&&.Append("User{")
&&&&&&&&&&&&.Append("ID:").Append(this.ID).Append(",&Name:\"").Append(this.Name).Append("\"")
&&&&&&&&&&&&.Append(",&Birthday:\"").Append(this.Birthday.ToString("yyyy-MM-dd")).Append("\"")
&&&&&&&&&&&&.Append(",&Gender:").Append(this.Gender)
&&&&&&&&&&&&.Append("}").ToString();
//object&get&and&set
User&user&=&new&User()
&&&&ID&=&601981,
&&&&Name&=&"",
&&&&Birthday&=&new&DateTime(1943,&2,&3),
&&&&Gender&=&UserGender.Male
mc.Store(StoreMode.Set,&"user",&user);
user&=&mc.Get&User&("user");
Console.WriteLine(user);
Test output:
18秒后key_3过期;随后的add命令,因为key_1和key_2仍有效,所以操作失败,而key_3已经过期,操作成功;再过15秒key_2过期
Multiple gets test
Attention: gets commands only supported by memcached 1.2.5 or higher versions, so the flowing code needs memcached 1.2.5 at least
IDictionary&string,&object&&multiResults&=&mc.Get(new&string[]&{&"key_1",&"key_2",&"key_3"&});
Console.WriteLine("gets command test");
foreach&(KeyValuePair&string,&object&&kvp&in&multiResults)
&&&&Console.WriteLine("\t{0}:&{1}",&kvp.Key,&kvp.Value);
MemcachedClient&mc&=&new&MemcachedClient();
mc.Store(StoreMode.Set,&"key_1",&"A".PadRight(20,&'A'));
mc.Store(StoreMode.Set,&"key_2",&"B".PadRight(20,&'B'));
IDictionary&string,&ulong&&casValues&=&null;
IDictionary&string,&object&&values&=&mc.Get(new&string[]&{&"key_1",&"key_2"&},&out&casValues);
Console.WriteLine("key_1:&{0}",&values["key_1"]);
Console.WriteLine("key_2:&{0}",&values["key_2"]);
mc.Store(StoreMode.Set,&"key_1",&"A".PadRight(20,&'X'));
mc.Get&string&("key_2");
Console.WriteLine("cas&key_1:&{0}",&mc.CheckAndSet("key_1",&"M".PadRight(20,&'M'),&casValues["key_1"]));
Console.WriteLine("cas&key_2:&{0}",&mc.CheckAndSet("key_2",&"N".PadRight(20,&'N'),&casValues["key_2"]));
Console.WriteLine("key_1&after&cas:&{0}",&mc.Get&string&("key_1"));
Console.WriteLine("key_2&after&cas:&{0}",&mc.Get&string&("key_2"));
Test output:
key_1因为在读取之后使用set命令更新了,因此cas操作失败,而key_2的cas操作成功
Consistent Hashing test
It's a bit difficult to test consistent hashing and load balance directly using Enyim, the following code token from Enyim will simplify this task
public&sealed&class&DefaultNodeLocator
&&&&private&const&int&ServerAddressMutations&=&100;
&&&&private&uint[]&
&&&&private&Dictionary&uint,&string&&servers&=&new&Dictionary&uint,&string&();
&&&&public&void&Initialize(IList&string&&nodes)
&&&&&&&&this.keys&=&new&uint[nodes.Count&*&DefaultNodeLocator.ServerAddressMutations];
&&&&&&&&int&nodeIdx&=&0;
&&&&&&&&foreach&(string&node&in&nodes)
&&&&&&&&&&&&List&uint&&tmpKeys&=&DefaultNodeLocator.GenerateKeys(node,&DefaultNodeLocator.ServerAddressMutations);
&&&&&&&&&&&&tmpKeys.ForEach(delegate(uint&k)&{&this.servers[k]&=&&});
&&&&&&&&&&&&tmpKeys.CopyTo(this.keys,&nodeIdx);
&&&&&&&&&&&&nodeIdx&+=&DefaultNodeLocator.ServerAddressM
&&&&&&&&Array.Sort&uint&(this.keys);
&&&&public&string&Locate(string&key)
&&&&&&&&if&(this.keys.Length&==&0)&return&null;
&&&&&&&&uint&itemKeyHash&=&BitConverter.ToUInt32(new&FNV1a().ComputeHash(Encoding.Unicode.GetBytes(key)),&0);
&&&&&&&&int&foundIndex&=&Array.BinarySearch&uint&(this.keys,&itemKeyHash);
&&&&&&&&if&(foundIndex&&&0)
&&&&&&&&&&&&foundIndex&=&~foundI
&&&&&&&&&&&&if&(foundIndex&==&0)&foundIndex&=&this.keys.Length&-&1;
&&&&&&&&&&&&else&if&(foundIndex&&=&this.keys.Length)&foundIndex&=&0;
&&&&&&&&if&(foundIndex&&&0&||&foundIndex&&&this.keys.Length)&return&null;
&&&&&&&&return&this.servers[this.keys[foundIndex]];
&&&&private&static&List&uint&&GenerateKeys(string&node,&int&numberOfKeys)
&&&&&&&&const&int&KeyLength&=&4;
&&&&&&&&const&int&PartCount&=&1;
&&&&&&&&List&uint&&k&=&new&List&uint&(PartCount&*&numberOfKeys);
&&&&&&&&for&(int&i&=&0;&i&&&numberOfK&i++)
&&&&&&&&&&&&byte[]&data&=&new&FNV1a().ComputeHash(Encoding.ASCII.GetBytes(String.Concat(node,&"-",&i)));
&&&&&&&&&&&&for&(int&h&=&0;&h&&&PartC&h++)
&&&&&&&&&&&&&&&&k.Add(BitConverter.ToUInt32(data,&h&*&KeyLength));
&&&&&&&&return&k;
public&class&FNV1a&:&HashAlgorithm
&&&&private&const&uint&Prime&=&;
&&&&private&const&uint&Offset&=&;
&&&&protected&uint&CurrentHashV
&&&&public&FNV1a()
&&&&&&&&this.HashSizeValue&=&32;
&&&&&&&&this.Initialize();
&&&&public&override&void&Initialize()
&&&&&&&&this.CurrentHashValue&=&O
&&&&protected&override&void&HashCore(byte[]&array,&int&ibStart,&int&cbSize)
&&&&&&&&int&end&=&ibStart&+&cbS
&&&&&&&&for&(int&i&=&ibS&i&&&&i++)
&&&&&&&&&&&&this.CurrentHashValue&=&(this.CurrentHashValue&^&array[i])&*&FNV1a.P
&&&&protected&override&byte[]&HashFinal()
&&&&&&&&return&BitConverter.GetBytes(this.CurrentHashValue);
测试过程:使用4个mamcached server配置,测试的几个key会分布在这些server上;删除一个server配置,测试key值仍会分配在剩余可用server上,可以维持原映射不变化;添加新的server,同样已有key值仍映射到相同server,不会变化
备注:删除server(或者server发生故障),添加新的server,Enyim都会重新构建映射索引(ServerPool.RebuildIndexes()),映射的一致性完全由完成映射的哈希算法保证
Test code:
IList&string&&servers&=&new&List&string&(new&string[]&{&
&&&&"192.168.1.100:800",&"192.168.1.201:800",&
&&&&"192.168.1.151:800",&"192.168.1.400:800"&});
DefaultNodeLocator&locator&=&new&DefaultNodeLocator();
locator.Initialize(servers);&//will rebuild the mapping indexes
string&key1&=&"42123",&key2&=&Guid.NewGuid().ToString();
Console.WriteLine("key:&{0},&server:&{1}",&key1,&locator.Locate(key1));
Console.WriteLine("key:&{0},&server:&{1}",&key2,&locator.Locate(key2));
//delete a server
for&(int&i&=&0;&i&&&servers.C&i++)
&&&&if&(servers[i]&!=&locator.Locate(key1)&&&&servers[i]&!=&locator.Locate(key2))
&&&&&&&&servers.RemoveAt(i);
&&&&&&&&break;
locator&=&new&DefaultNodeLocator();
locator.Initialize(servers);&//will rebuild the mapping indexes
Console.WriteLine("mappings&after&1&server&was&removed");
Console.WriteLine("key:&{0},&server:&{1}",&key1,&locator.Locate(key1));
Console.WriteLine("key:&{0},&server:&{1}",&key2,&locator.Locate(key2));
//add a new server
servers.Add(":11211");
locator&=&new&DefaultNodeLocator();
locator.Initialize(servers);&//will rebuild the mapping indexes
Console.WriteLine("mappings&after&new&server&was&added");
Console.WriteLine("key:&{0},&server:&{1}",&key1,&locator.Locate(key1));
Console.WriteLine("key:&{0},&server:&{1}",&key2,&locator.Locate(key2));
Conclusion: consistent hashing is well supported
Load balance test
Test code:
IList&string&&servers&=&new&List&string&(new&string[]&{
&&&& "192.168.1.100:800",&"192.168.1.201:800",&
&&&& "192.168.1.151:800",&"192.168.1.400:800"&});
DefaultNodeLocator&locator&=&new&DefaultNodeLocator();
locator.Initialize(servers);
int[]&mappings&=&new&int[4];&//to save the total count of mapped keys in the servers
Random&rd&=&new&Random();
for&(int&i&=&0;&i&&&1000000;&i++)
&&&&int&index&=&servers.IndexOf(locator.Locate(i&%&3&==&0&?&i.ToString()&:&i&%&3&==&1&?&rd.Next().ToString()
&&&&&&&& :&Guid.NewGuid().ToString()));
&&&&mappings[index]++;
for&(int&i&=&0;&i&&&servers.C&i++)
&&&&Console.WriteLine("server:&{0},&keys:&{1}",&servers[i],&mappings[i]);
Test results:
Enyim不像Memcached.Clientlibrary那样对服务器的负载提供可配置的支持,不过这一点可以尝试通过配置server list实现,例如两台server A和B,需要A承受75%的负载,可以尝试在servers的配置中,配置3个A和1个B(这个方法需要测试,并且注意server A会存在3个MemcachedNode节点的实例,每个实例均会应用pool设置,主要是minPoolSize、maxPoolSize)从测试效果来看,负载的分布在各种情况下均保持固定比例,负载分布很不平衡。按道理,如果无法实现负载的配置型,应当实现平均的分布负载,这是算法本身的缺陷
Conclusion: doe's not support configurable load balance, loads not balanced among servers
代码结构、处理方式
主要结构:
PooledSocket: 负责socket通讯,例如发送命令、读取回应消、数据。为了提高高并发情况下的吞吐量,socket一直保持连接状态,这也是memcached官方推荐的处理方式
MemcachedNode: 表示一个Memcached服务器节点,他主要负责维护当前节点的活动(可用)状态,维护与该节点通讯用的socket pool。MemcachedNode.Acquire()方法就是请求一个空闲的PooledSocket对象用于通讯作业
&& socket pool的维护通过内嵌类InternalPoolImpl实现。他使用一个空闲队列freeItems保存空闲的socket连接对象,Acquire()方法请求空闲socket时,从空闲队列中取出一个,socket对象使用完毕时通过ReleaseSocket方法放回队列中。仅在请求或者释放socket回队列时加锁。使用者不用显示调用ReleaseSocket方法将socket释放回pool中,创建PooledSocket时ReleaseSocket方法作为委托传给PooledSocket,PooledSocket Dispose时自动通过委托进行调用
ServerPool: 负责对所有server节点的维护,包括&& 1. key与节点的映射:LocateNode方法确定某个key值映射到哪个节点,SplitKeys确定一组key值分别映射到哪些节点(例如使用gets命令批量读取时使用),RebuildIndexes用于任何节点状态发生变化时,重建映射索引
&& 2. 节点状态的管理,具体处理过程如下:&&&&&& a). workingServers列表存放可用的工作节点,deadServers列表存放死节点(出现故障的)&&&&&& b). MemcachedNode创建新的socket或者从pool中请求socket时,如果发生socket异常,则将该MemcachedNode节点标记为不可用&&&&&& c). PooledSocket进行通讯时如果发生socket异常,会将该PooledSocket对象标记为不可用。PooledSocket使用完毕后都会执行Dispose方法,该方法中通过委托cleanupCallback(MemcachedNode在创建PooledSocket时传进来的)调用InternalPoolImpl的ReleaseSocket方法,将MemcachedNode标记为不可用&&&&&& d). 下一次请求中,MemcachedClient通过ServerPool调用LocateNode或者SplitKeys方法,将key映射到MemcachedNode时,如果节点被标记为不可用,则将节点从workingServers列表移入到deadServers列表中,重建映射索引,这样后续的请求就不会再将key值映射到这个不可用的节点上了&&&&&& e). ServerPool定期的对deadServers列表中的死节点检查状态(callback_isAliveTimer方法,调用MemcachedNode的Ping方法对节点状态进行检查),如果节点可用,则将节点从deadServers列表移入到workingServers列表中,并重建映射索引,这样这些状态已经恢复的节点将重新加入到工作节点中&&&&&& 这个状态管理机制存在一个数据一致性问题,如果某个节点仅仅因为网络故障而中断,之前映射到他上面的数据将被映射到其他节点上。网络恢复时该节点重新加入工作组中,因为一致性哈希算法的缘故,之前映射到他上面的key又将重新映射到这个节点。问题之一,如果该节点上的数据没有清除掉,这些数据可能已经是老版本、过时的数据了;问题之二,造成其他节点上存在另外版本的脏数据&&&&&& Enyim并没有充分考虑failover、failback机制,这是一个缺陷
Operations
Operations目录下的类,是对Memcached各种命令(命令名称、通讯协议)的封装,构造时传入ServerPool对象,通过他可以得到与各个key对应的MemcachedNode,再取得PooledSocket,完成各个命令的通讯,以及对结果的处理
KeyTransformers: 主要是与memcached server通讯时,对key值进行处理(是否需要哈希等,使用什么哈希算法等),默认使用的DefaultKeyTransformer并不进行哈希处理(仅仅检查key值不能包含特殊字符),这在memcached server跨应用使用时比较简单
Transcoders: 引用类型对象序列化的处理博客园团队
经过一周的努力,我们的&.NET跨平台之旅&取得了一个重要的进展&&基于.NET Core改写了开源的memcached .NET客户端EnyimMemcached,实现了Linux上访问memcached缓存,解决了跨平台.NET的缓存问题。
针对我们的应用场景,将实际应用迁移到部署在Linux服务器上的跨平台.NET(.NET Core)有两大障碍:一个障碍是Linux上访问SQL Server数据库,一个障碍是Linux上访问memcached缓存。第一个问题在苦等之后,终于被微软解决了,详见 ;而第二个问题,微软还没开始解决,目前的ASP.NET 5只支持进程内的内存缓存与redis,不支持memcached。但我们不想苦等了,选择了自己动手、丰衣足食,尝试自己解决这个问题。
我们用的memcached缓存客户端是EnyimMemcached,之前对它进行过,对源代码有些了解。用dnx基于.NET Core编译EnyimMemcached的源代码,出现了300多个编译错误,当时有点望而却步,但后来还是下定决心解决这些编译错误。
一类编译错误是对System.Configuration程序集的依赖,EnyimMemcached的配置是放在web.config中的,有不少代码依赖System.Configuration。而ASP.NET 5中根本没有web.config这个东东,corefx中自然也就没有System.Configuration的实现。为了解决这个问题,我们暂时放弃使用配置文件,通过硬编码进行配置,添加的主要代码如下:
IMemcachedClientConfiguration configuration = new MemcachedClientConfiguration(_loggger);
configuration.SocketPool.MinPoolSize = 20;
configuration.SocketPool.MaxPoolSize = 1000;
configuration.SocketPool.ConnectionTimeout = new TimeSpan(0, 0, 3);
configuration.SocketPool.ReceiveTimeout = new TimeSpan(0, 0, 3);
configuration.SocketPool.DeadTimeout = new TimeSpan(0, 0, 3);
一类编译错误是corefx(.NET Core Framework)中程序集的变化,比如:
IPEndPoint跑到了System.Net.Primitives程序集中
System.Security.Cryptography.HashAlgorithm跑到了System.Security.Cryptography.Algorithms程序集中
System.Threading.Timer成为了一个独立的程序集
一类编译错误是corefx中类库的变化,比如没有了System.Net.Dns.GetHostEntry(),需要改用System.Net.NameResolution程序集中的System.Net.Dns.GetHostAddressesAsync()。
还有一类最头疼的编译错误是corefx中没有二进制序列化(BinaryFormatter)的实现,而对于EnyimMemcached来说这是关键部分,对象的缓存读写全靠二进制序列化与反序列化。针对这个问题,我们改用Json.NET进行bson序列化与反序列化。
序列化实现代码如下:
protected virtual ArraySegment&byte& SerializeObject(object value)
using (var ms = new MemoryStream())
using (BsonWriter writer = new BsonWriter(ms))
JsonSerializer serializer = new JsonSerializer();
serializer.Serialize(writer, value);
return new ArraySegment&byte&(ms.ToArray(), 0, (int)ms.Length);
反序列化实现代码如下:
T ITranscoder.Deserialize&T&(CacheItem item)
if (item.Data == null || item.Data.Count == 0) return default(T);
using (var ms = new MemoryStream(item.Data.ToArray()))
using (BsonReader reader = new BsonReader(ms))
if(typeof(T).GetTypeInfo().ImplementedInterfaces.Contains(typeof(IEnumerable)))
reader.ReadRootValueAsArray = true;
JsonSerializer serializer = new JsonSerializer();
return serializer.Deserialize&T&(reader);
Json.NET的bson反序列有个麻烦的地方,对于集合类型需要专门设置ReadRootValueAsArray的值为true。当时在这个地方折腾了不少时间,没找到好的解决方法,只能用反射实现,就是上面代码中的&typeof(T).GetTypeInfo().ImplementedInterfaces.Contains(typeof(IEnumerable)) 。
在解决了这些编译错误并在开发环境中测试通过之后,我们就将改造后的EnyimMemcached应用到&.NET的跨平台之旅&的示例站点(/)上,调用代码如下:
public class TabNavService : ITabNavService
private ITabNavRepository _tabNavR
private IMemcachedClient _memcachedC
public TabNavService(
ITabNavRepository tabNavRepository,
IMemcachedClient memcachedClient)
_tabNavRepository = tabNavR
_memcachedClient = memcachedC
public async Task&IEnumerable&TabNav&& GetAll()
var result = await _memcachedClient.GetAsync&IEnumerable&TabNav&&(cacheKey);
if(!result.Success)
var tabNavs = await _tabNavRepository.GetAll();
await _memcachedClient.StoreAsync(StoreMode.Add, cacheKey, tabNavs, new TimeSpan(0, 0, 300));
return tabN
return result.V
_memcachedClient是通过&依赖注入&注入的,但是在注入时,我们自己给自己挖了一个坑:
services.AddTransient&IMemcachedClient, MemcachedClient&();
加了memcached缓存功能之后,示例站点在一台测试服务器(只有当前一个请求,没有其它请求)上运行正常,缓存读写正常。
但是一发布到的正式服务器上(有多个请求),请求发出后就一直处于等待状态,服务器无任何响应。在原以为大功告成的时刻却卡在了这个奇怪的问题上,这个滋味你懂的。
折腾了半天,实在找不到原因,找了个替罪羊&&可能是corefox中System.Net.Sockets在Linux上的实现对并发请求的处理有问题,准备暂时放弃。
就在这一刻,突然想到,MemcachedClient的构造函数中有初始化socket pool的操作,每创建一个MemcachedClient的实例时都要创建好多到memcached服务器的socket连接。难道是在注入时忘了使用单例,造成每个请求都要创建MemcachedClient的实例,太多的socket连接让kestrel服务器不堪重负。想到这一点,立马跑到电脑前打开代码一看,立马发现自己的坑坑自己不浅。改为单例后,问题立马解决。
services.AddSingleton&IMemcachedClient, MemcachedClient&();
于是,运行在Linux服务器上的示例站点()用上了memached缓存;于是,写了这篇博文分享自己坑自己的过程;于是,我们的.NET跨平台之旅迈上了一个新台阶。原文链接:
阅读排行榜

我要回帖

更多关于 enyim memcached 的文章

 

随机推荐