鲁能国际中心是不是欠PPTV这俩解说的钱

项目语言:C++
权限:read-only(如需更高权限请先加入项目)
Index: daemonize.py
===================================================================
--- daemonize.py (revision 0)
+++ daemonize.py (revision 225)
@@ -0,0 +1,290 @@
+#!/usr/bin/python
+# encoding: utf-8
+@version: ??
+@author: calmwu
+@license: Apache Licence
+@contact:
+@file: daemonize.py
+import os
+import sys
+import pwd
+import atexit
+import signal
+import time
+import subprocess
+from gamecloud_base.utils import singleton
+#http://blog-/2010/07/creating-a-daemon-process-in-python.html
+def signal_handler(signo, frame):
if signo == signal.SIGTERM:
singleton(Daemon)().get_appinstance().stop()
if signo == signal.SIGUSR1:
singleton(Daemon)().get_appinstance().reload()
+@singleton
+class Daemon(object):
def __init__(self,
app_instance = None,
chroot_directory = None,
working_directory = &./&,
umask = 0011,
uid = None,
gid = None,
stdin = None,
stdout = None,
stderr = None):
self.app_instance = app_instance
self.process_name = app_instance.app_name
self.pid_file = &/tmp/{0}_{1}.pid&.format(self.process_name, app_instance.app_id)
self.chroot_directory = chroot_directory
self.working_directory = working_directory
self.umask = umask
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
if uid is None:
uid = os.getuid()
self.uid = uid
if gid is None:
gid = os.getgid()
self.gid = gid
self.action_funs = {
'start' : self._start,
'stop' : self._stop,
'reload' : self._reload
def _change_root_directory(self):
&&&限制程序目录访问,用root用户才能设置,为了安全,这样也可以限制working目录&&&
os.chdir(self.chroot_directory)
os.chroot(self.chroot_directory)
except Exception, exc:
err_info = &Unable to change root directory ({0})\n&.format(exc)
sys.stderr.write(err_info)
sys.exit(-1)
def _change_file_creation_mask(self):
os.umask(self.umask)
except Exception, exc:
err_info = &Unable to change creation mask ({0})\n&.format(exc)
sys.stderr.write(err_info)
sys.exit(-2)
def _change_woking_directory(self):
os.chdir(self.working_directory)
except Exception, exc:
err_info = &Unable to change working directory ({0})\n&.format(exc)
sys.stderr.write(err_info)
sys.exit(-3)
def _change_process_owner(self):
passwd_entry = pwd.getpwuid(self.uid)
username = passwd_entry.pw_name
print &{0} pwd({1}:{2}:{3})&.format(self.process_name, username, self.uid, self.gid)
os.setgid(self.gid)
os.setuid(self.uid)
except Exception, exc:
err_info = &Unable to change process owner ({0})\n&.format(exc)
sys.stderr.write(err_info)
sys.exit(-3)
def _redirect_stream(self, system_stream, target_stream):
target_fd = os.open(os.devnull, os.O_RDWR)
if target_stream is not None:
target_fd = target_stream.fileno()
os.dup2(target_fd, system_stream.fileno())
def _check_pidfile(self):
&&&判断进程是否存在&&&
if not os.path.exists(self.pid_file):
return True
#根据进程名+id来过滤
ps_cmd = 'ps x|grep {0}|grep {1}|grep -v grep'.format(self.process_name, self.app_instance.app_id)
p = subprocess.Popen(ps_cmd, stdout=subprocess.PIPE, shell=True)
if(len(p.stdout.readlines()) != 0):
return False
return True
def _write_pidfile(self):
pid_fd = None
pid_fd = open(self.pid_file, 'w')
pid_fd.write(str(os.getpid()))
except Exception, exc:
err_info = &Unable write pid to {0}\n&.format(self.pid_file)
sys.stderr.write(err_info)
sys.exit(-1)
if pid_fd is not None:
pid_fd.close()
def _remove_pidfile(self):
os.remove(self.pid_file)
def _daemon(self):
#判断进程是否存在
if False == self._check_pidfile():
err_info = &{0} is running! please stop it!\n&.format(self.process_name)
sys.stderr.write(err_info)
sys.exit(-1)
signal.signal(signal.SIGTSTP,signal.SIG_IGN)
signal.signal(signal.SIGTTOU,signal.SIG_IGN)
signal.signal(signal.SIGTTIN,signal.SIG_IGN)
signal.signal(signal.SIGCHLD,signal.SIG_IGN)
signal.signal(signal.SIGHUP,signal.SIG_IGN)
signal.signal(signal.SIGPIPE,signal.SIG_IGN)
pid = os.fork()
except OSError, exc:
err_info = &fork first child failed! ({0})\n&.format(exc)
sys.stderr.write(err_info)
sys.exit(-4)
if pid == 0:
# child process
os.setsid()
pid = os.fork()
except OSError, exc:
err_info = &fork second child failed! ({0})\n&.format(exc)
sys.stderr.write(err_info)
sys.exit(-4)
if pid & 0:
sys.exit(0)
sys.exit(0)
self._write_pidfile()
atexit.register(self._remove_pidfile)
self._redirect_stream(sys.stdin, self.stdin)
self._redirect_stream(sys.stdout, self.stdout)
self._redirect_stream(sys.stderr, self.stderr)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGUSR1, signal_handler)
def _start(self):
if self.chroot_directory is not None:
self._change_root_directory()
self._change_file_creation_mask()
self._change_woking_directory()
self._change_process_owner()
self._daemon()
self.app_instance.run()
def _stop(self):
pid_fd = None
proc_id = -1
if not os.path.exists(self.pid_file):
print 'pid file[{0}] is not exist!'.format(self.pid_file)
pid_fd = open(self.pid_file, 'r')
proc_id = int(pid_fd.read().strip())
except Exception, exc:
err_info = &pidfile {0} does not exist. {1} not running?\n&.format(self.pid_file, self.process_name)
sys.stderr.write(err_info)
if pid_fd is not None:
pid_fd.close()
if proc_id & 0:
os.kill(proc_id, signal.SIGTERM)
print 'send stop cmd to process[{0}]'.format(proc_id)
time.sleep(0.1)
sys.exit(0)
def _reload(self):
pid_fd = open(self.pid_file, 'r')
proc_id = int(pid_fd.read())
pid_fd.close()
except Exception, exc:
err_info = &pidfile {0} does not exist. {1} not running?\n&.format(self.pid_file, self.process_name)
sys.stderr.write(err_info)
os.kill(proc_id, signal.SIGUSR1)
time.sleep(0.1)
sys.exit(0)
def do_action(self, action):
if self.action_funs.has_key(action):
action_func = self.action_funs[action]
action_func()
sys.stderr.write(&action[{0}] is unknown!&.format(action))
sys.exit(0)
def get_appinstance(self):
return self.app_instance
+class Runner(object):
def __init__(self, app_name):
self.exit_flag = False
self.m_appname = app_name
def appname(self):
return self.m_appname
def stop(self):
print &Runner({0}) stop!&.format(os.getpid())
self.exit_flag = True
def reload(self):
print &Runner({0}) reload!&.format(os.getpid())
def run(self):
print &Runner({0}) start run!&.format(os.getpid())
while not self.exit_flag:
time.sleep(1)
print &Runner run exit!&
+if __name__ == '__main__':
print sys.argv[0]
runner = Runner()
daemon = singleton(Daemon)(runner, stdout=sys.stdout, stderr=sys.stderr)
#daemon = Daemon(&/sdfsd/&)
daemon.do_action('start')
Index: check_parameter.py
===================================================================
--- check_parameter.py (revision 0)
+++ check_parameter.py (revision 225)
@@ -0,0 +1,258 @@
+#! /usr/bin/python
+#coding=utf-8
+'''
+@version: ??
+@author: lippsu
+@contact:
+@software: PyCharm
+@file: check_parameter
+'''
+import re
+import types
+from inter_error import InterError
+#小于5位大于14位都是标配不行的QQ号
+UIN_INVALID_L = 9999
+UIN_INVALID_R = 00
+#小于4位大于14位都是标配不行的APPID号
+APPID_INVALID_L = 999
+APPID_INVALID_R = 00
+class CheckParameter(object):
uin_pattern = re.compile(r&^[1-9][0-9]{3,10}[0-9]$&)
appid_pattern = re.compile(r&^[1-9][0-9]{2,10}[0-9]$&)
email_pattern = re.compile(r'^[\w\d]+[\d\w\_\.]+@([\d\w]+)\.([\d\w]+)(?:\.[\d\w]+)?$')
#phone_pattern = re.compile(r'^(?:\+86)?(\d{3})\d{8}$|^(?:\+86)?(0\d{2,3})\d{7,8}$')
@staticmethod
def _check_uin_int(uin):
return ((uin & UIN_INVALID_L) and (uin & APPID_INVALID_R))
@staticmethod
def _check_appid_int(appid):
return ((appid & APPID_INVALID_L) and (appid & UIN_INVALID_R))
@staticmethod
def _check_uin_str(uin):
if not re.match(CheckParameter.uin_pattern, uin):
return False
return True
@staticmethod
def _check_appid_str(appid):
if not re.match(CheckParameter.appid_pattern, appid):
return False
return True
@staticmethod
def check_uin(uin):
if CheckParameter.is_int(uin):
return CheckParameter._check_uin_int(uin)
elif CheckParameter.is_str(uin):
return CheckParameter._check_uin_str(uin)
return False
@staticmethod
def check_appid(appid):
if CheckParameter.is_int(appid):
return CheckParameter._check_appid_int(appid)
elif CheckParameter.is_str(appid):
return CheckParameter._check_appid_str(appid)
return False
@staticmethod
def check_email(email):
if not CheckParameter.is_str(email):
return False
if not CheckParameter.check_str_len(email, 64):
return False
if not re.match(CheckParameter.email_pattern, email):
return False
return True
@staticmethod
def check_phone(phone): #仅作长度和类型校验
if not CheckParameter.is_str(phone):
return False
if not CheckParameter.check_str_len(phone, 32):
return False
return True
@staticmethod
def check_url(url): #仅作长度和类型校验
if not CheckParameter.is_str(url):
return False
if not CheckParameter.check_str_len(url, 128):
return False
#if not re.match(CheckParameter.phone_pattern, phone):
return False
return True
@staticmethod
def check_str_len(ag_str, limit):
strlen = len(ag_str)
return (strlen &= limit)
@staticmethod
def is_str(para):
return isinstance(para, basestring)
@staticmethod
def is_int(para):
return isinstance(para, int)
@staticmethod
def start_with(ag_str, prefix):
'''判断字符串是否以prefix开头'''
if not prefix:
return False
index= ag_str.find(prefix)
return (index == 0)
@staticmethod
def check_dict_kv(data, dict_tpl):
&&&检查一个字典中的字段和字段的类型
1. 判定tmp_dict中的key是否存在于data中
2. 判定dict中的valueType在tmp_dict中是否一致
if not isinstance(data, dict) or not isinstance(dict_tpl, dict):
raise InterError(InterError.E_INVALID_PARA, &data and dict_tpl must be both dict&)
for key in dict_tpl:
#key不存在的情况
if key not in data:
raise InterError(InterError.E_KEY_UNEXIST,&key[%s] does not exist& % key)
value = data[key]
value_type = type(data[key])
#print 'kk',value, value_type,dict_tpl[key]
for item in dict_tpl[key]:
if isinstance(value, item):
#print 'xxxxxxxxxxx',value, value_type, item
break #success
raise InterError(InterError.E_INVALID_PARA, &dict[%s] is not type of %s& % (key, dict_tpl[key]))
'''
value = data[key]
if not isinstance(dict_tpl[key], (list, tuple)): # 模版各个V必须是list,list中定义了该项可能的类型
raise InterError(InterError.E_INVALID_PARA, &param[%s] must be a list& % key)
for item in dict_tpl[key]: #遍历该value的类型
#判断是否是特定类型, 元组默认都是类型
if types.TypeType == type(item) or types.TupleType == type(item):
if not isinstance(value, item):
raise InterError(InterError.E_INVALID_PARA, &data[%s] is not a %s& % (key, item))
#使用回调函数判断
elif types.FunctionType == type(item):
if not item(value):
raise InterError(InterError.E_INVALID_PARA, &data[%s] is invalid& % (key))
#不能识别,正确
pass'''
@staticmethod
def is_numberic(char):
#print char,(char&='0' and char&='9')
return (char&='0' and char&='9')
@staticmethod
def parse_cookie_uin(uin):
'''
@传入字符串 uin cookie
1. 去除非数字前缀和非数字的后缀
2. 然后取出前导0
非法uin,返回False
'''
if (not uin) or (not CheckParameter.is_str(uin)):
return False
# 获取字符串中第一个非0开头的子串
get_num_str = re.compile(r&\D*([1-9]\d*)&, re.I)
m = get_num_str.search(uin)
return False
real_uin = m.group(1) # group 0:整体匹配,group i:第i个括号的匹配
if not real_uin:
return False
return real_uin
+if __name__ == &__main__&:
print CheckParameter.parse_cookie_uin('123')
print CheckParameter.parse_cookie_uin('a123')
print CheckParameter.parse_cookie_uin('123b')
print CheckParameter.parse_cookie_uin('a123b')
print CheckParameter.parse_cookie_uin('ab')
print CheckParameter.parse_cookie_uin('a12c35b')
print CheckParameter.parse_cookie_uin('ab')
print 'test'
base = '_abc'
print CheckParameter.start_with(base, '')
print CheckParameter.start_with(base, '_')
print CheckParameter.start_with(base, ' _')
print CheckParameter.start_with(base, '_a')
print CheckParameter.start_with(base, ' _a')
print CheckParameter.start_with(base, '_b')
print CheckParameter.start_with(base, '_abcd')
cgw_result_fmt = {
&returnCode&:[int],
&returnMessage&:[basestring],
&data&:[(list, dict)]
test_dict = {
&returnCode&:9,
&returnMessage&:'中文',
CheckParameter.check_dict_kv(test_dict, cgw_result_fmt)
i_appids = [,111,,]
s_appids = ['','10','100','1000','10000',';,';,'','','',';]
i_uins = [1,11,111,,]
s_uins = ['01','10','100','1000','10000',';,';,'','','',';]
emails = ['lipp@', 'lipp@aa.', 'lipp@', '', 's.l.', '@b.com', 'a@.com', 'a@b.']
phones = [ '+86-136-', ';, '',';,'']
urls = [ '', '12345', ';, ';]
for v in i_appids:
print v, CheckParameter.check_appid(v)
for v in s_appids:
print v, CheckParameter.check_appid(v)
for v in i_uins:
print v, CheckParameter.check_uin(v)
for v in s_uins:
print v, CheckParameter.check_uin(v)
for v in emails:
print v, CheckParameter.check_email(v)
for v in phones:
print v, CheckParameter.check_phone(v)
for v in urls:
print v, CheckParameter.check_url(v)
Index: http_client.py
===================================================================
--- http_client.py (revision 0)
+++ http_client.py (revision 225)
@@ -0,0 +1,235 @@
+#! /usr/bin/python
+#coding=utf-8
+'''
+@version: ??
+@author: lippsu
+@contact:
+@software: PyCharm
+@file: http_client2
+'''
+import requests
+from gamecloud_base.log_module import logM
+import json
+#from utils.logging.logging_stub import LoggerModule as logM
+'''
+# header example
+headers = {
+ 'Accept':'text/html,application/xhtml+xml,application/q=0.9,image/webp,*/*;q=0.8',
+ 'Accept-Encoding':'gzip, deflate, sdch',
+ 'Accept-Language':'zh-CN,q=0.8',
+ 'Connection':'keep-alive',
+ 'Host':'',
+ 'Upgrade-Insecure-Requests':'1',
+ 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0. Safari/537.36'
+# body example
+ 'csrfmiddlewaretoken':'',
+ 'next':'/',
+ 'username':'',
+ 'password':'pass'
+#proxy example
+my_proxy = {
&http&: &http://web-:8080&,
+ &https&: &http://web-:8080&
+'''
+class HttpClient(object):
'''http model
1. 支持短连接和长连接
2. get方法只不支持json
3. post方法支持dict和json
'''
def __init__(self, short_conn = True, timeout = 6, headers=None, proxy=None, is_https=None, log_type='default'):
'''
it maybe throw a exception!
'''
self.url = None
self.body = None
self.short_conn = short_conn
self.headers = headers
self.proxy = proxy
self.is_https = is_https
self.timeout = timeout
self._response = None
self.__session = None
self._log_type = log_type
if not self.short_conn: # 长连接,申请唯一的session
self.__new_session()
def __del__(self):
self.close_session()
def __reset_session(self):
self.headers = None
self.proxy = None
self.is_https = None
self.timeout = None
self._response = None
self.close_session()
def __init_session(self):
if self.headers:
self.__session.headers = self.headers
if self.proxy:
self.__session.proxies = self.proxy
if self.timeout:
self.__session.timeout = self.timeout
if self.is_https:
self.__session.verify = True
def __new_session(self):
self.__reset_session()
self.__session = requests.Session()
self.__init_session()
def close_session(self):
if not self.__session:
self.__session.close()
self.__session = None
except Exception as e:
print 'close http error:', e
def set_timeout(self, timeout):
self.__session.timeout = timeout
def set_headers(self, headers):
self.__session.headers = self.headers
def add_http_header_item(self, key, value):
self.__session.headers.update({key:value})
def clear_http_header_item(self, key):
self.__session.headers.update({key:None})
def set_proxies(self, proxy):
self.__session.proxies = self.proxy
def get_cookie(self, key):
return self.__session.cookies.get(key)
def get(self, url, body=None):
'''
return: 失败返回False,成功返回response.text
'''
ret = False
if self.short_conn:
self.__new_session()
#print (&+++Post to url(%s) : params %s &) % (url, body)
logM.debug(&+++Post to url(%s) : params(%s).&, url, body, app_name=self._log_type)
self.body:
self._response = self.__session.get(url, params = self.body)
self._response = self.__session.get(url)
if self.get_status_code() == 200:
self.get_response_text()
#print &---Invoke Failed! Return from url(%s) : res[%s]& % (url, ret)
logM.error(&---Invoke Failed! Return from url[%s] status_code[%s] response_text[%r].&,
url, self.get_status_code(),self.get_response_text(), app_name=self._log_type)
except Exception as e:
#print &---Catch exception. Get url(%s) : exception[%s] & % (url, e)
logM.error(&---Catch exception. Post url(%s) : exception(%s)&, url, str(e), app_name=self._log_type)
if self.short_conn:
self.close_session()
return ret
def post(self, url, body, proto_json = True):
'''
@proto_json 指明通信协议(body格式)是否为json
@返回数据:如果proto为json,返回的数据是json decode后的dict数据包
@如果出现系统错误,返回False
'''
ret = False
if self.short_conn:
self.__new_session()
#print (&+++Post to url(%s) : params %s &) % (url, body)
logM.debug(&+++Post to url(%s) : params(%s).&, url, body, app_name=self._log_type)
self._response = self.__session.post(url, data=body)
if self.get_status_code() == 200:
if proto_json:
self.get_response_unjson()
self.get_response_text()
#print &---Invoke Failed! Return from url[%r] status_code[%r] response_text[%r]& % (url, self.get_status_code(),self.get_response_text())
logM.error(&---Invoke Failed! Return from url[%s] status_code[%s] response_text[%r].&,
url, self.get_status_code(),self.get_response_text(), app_name=self._log_type)
except Exception as e:
#print &---Catch exception. Post url(%s) : exception[%s] & % (url, e)
logM.error(&---Catch exception. Post url(%s) : exception(%s)&, url, str(e), app_name=self._log_type)
if self.short_conn:
self.close_session()
return ret
def get_response(self):
return self._response
def get_response_text(self):
if not self._response:
return None
return self._response.text
def get_response_unjson(self):
if not self._response:
return None
return self._response.json()
def get_status_code(self):
return self._response.status_code
'''
@staticmethod
def post_once(req_url, dict_args):
# success: decode js
fail: return False
ret = False
h_session = requests.Session()
json_args = json.dumps(dict_args)
#logM.debug(&+++Post to url(%s) : params %s & ,req_url, json_args, app_name=&default&)
res = h_session.post(req_url, data=json_args)
if res.status_code == 200:
result = res.json()
ret = result
#logM.debug 不能用百分号%,
且不能用括号
#logM.debug(&---Invoke Successed! Return from url(%s) : ret=%s & , req_url, result, app_name=&default&)
print &---Invoke Failed! Return from url(%s) : res[%s]& % (req_url, res)
#logM.debug(&---Invoke Failed! Return from url(%s) : res[%s]& , req_url, res, app_name=&default&)
except Exception,ex:
print &---Catch exception. Post url(%s) : exception[%s] & % (req_url, ex)
#logM.error(&---Catch exception. Post url(%s) : exception[%s] &, req_url, ex, app_name=&default&)
h_session.close()
return ret
'''
+if __name__ == &__main__&:
my_proxy = {
&http&: &http://web-:8080&,
&https&: &http://web-:8080&
http_client = HttpClient(proxy=my_proxy)
http_client.GET('/')
print &%r& % http_client.get_response_text()
\ No newline at end of file
Index: parameter_check.py
===================================================================
--- parameter_check.py (revision 0)
+++ parameter_check.py (revision 225)
@@ -0,0 +1,333 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+'''
+Description:
This module provides functions to check parameters, including parameter's type and value.
You need to define parameter template firstly. The module will use it to match the input parameters.
Initial version
+'''
+import sys
+import os
+import re
+import socket
+import time
+__all__ = [
&Attribute&,
&is_interger&,
&is_float&,
&is_string&,
&is_ipv4&,
&is_boolean&,
&regexCheck&,
&is_list&,
&is_dictionary&,
&is_date&,
&is_appid&,
&is_email&,
&check_parameters&,
&check_single_parameter&
+class Attribute:
type_list = [&int&, &float&, &string&, &boolean&, &ipv4&, &regex&, &list&, &dictionary&, &date&, &uin&, &appid&, &email&]
level_list = [&mandatory&, &optional&]
def __init__(self, name, type, level, value=None):
if not (self._type_check(type) and self._level_check(level)):
self.name = None
self.type = None
self.level = None
self.value = None
self.name = name
self.type = type
self.level = level
self.value = value
@staticmethod
def _type_check(type):
if type not in Attribute.type_list:
return False
return True
@staticmethod
def _level_check(level):
if level not in Attribute.level_list:
return False
return True
+def is_interger(data, value=None):
if (isinstance(data, int) or isinstance(data, long)):
if value is None:
return True
elif isinstance(value, tuple):
return data &= value[0] and data &= value[1]
elif isinstance(value, list):
return data in value
return False
+def is_float(data, value=None):
if isinstance(data, float):
if value is None:
return True
elif isinstance(value, tuple):
return data &= value[0] and data &= value[1]
elif isinstance(value, list):
return data in value
return False
+def is_string(str, value=None):
if isinstance(str, basestring):
if isinstance(value, tuple) and len(value)==2 and len(str) &= value[0] and len(str) &= value[1]:
return True
return str in value if value != None else True
return False
+def is_ipv4(ip, value=None):
socket.inet_aton(ip)
return ip in value if value != None else True
except Exception as e:
return False
+def is_boolean(data, value=None):
return isinstance(data, bool) and (data in value if value != None else True)
+def is_list(data, value=None):
return isinstance(data, list) and (data == value if value != None else True)
+def is_dictionary(data, value=None):
return isinstance(data, dict) and (data == value if value != None else True)
+def is_date(data, value=None):
time.strptime(data, &%Y-%m-%d&)
return True
except Exception as e:
return False
+def regexCheck(data, value=None):
if value == None:
return True
if not isinstance(value, str):
return False
re_pattern = re.compile(value)
if re_pattern.match(data) == None:
return False
return True
+def is_uin(data, value=None):
UIN_INVALID_L = 9999
UIN_INVALID_R = 00
if not is_interger(data, range(UIN_INVALID_L, UIN_INVALID_R+1)):
return False
return True
except Exception as e:
return False
+def is_appid(data, value=None):
APPID_INVALID_L = 999
APPID_INVALID_R = 00
if not is_interger(data, range(APPID_INVALID_L, APPID_INVALID_R+1)):
return False
return True
except Exception as e:
return False
+def is_email(data, value=None):
if not is_string(data, (64, 64)):
return False
email_pattern = r'^[\w\d]+[\d\w\_\.]+@([\d\w]+)\.([\d\w]+)(?:\.[\d\w]+)?$'
if not regexCheck(data, email_pattern):
return False
return True
except Exception as e:
return False
+func_register = {
&int&:is_interger,
&float&:is_float,
&string&:is_string,
&ipv4&:is_ipv4,
&boolean&:is_boolean,
&regex&:regexCheck,
&list&:is_list,
&dictionary&:is_dictionary,
&date&:is_date,
&uin&:is_uin,
&appid&:is_appid,
&email&:is_email
+def check_parameters(params, templates, mandatory_flag=True):
'''
#########################################################
Description:
This function is used to traverse templates and check all of them with params.
Parameters:
dictionary, the input parameters
E.g. params = {&nonce&:1, &sid&:&abc&, &tt&:True, &ip&:&1.1.1.1&, &test&:&1234&}
dictionary, parameters' templates, name:attribute(name, type, level, value)
the name of parameter
the type of parameter
Currently the following types supported:
the level of parameter,
Currently the following levels supported:
mandatory, if mandatory_flag is true, this field must exist, otherwise, when exists, it will be checked.
optional, when field exists, it will be checked.
the value list of parameters, it could contains one or more real values,
and could be omitted if you do not care the value
For string,
tuple (min length, max length)
[value1, value2]
attribute_templates = {
&nonce&:Attribute(&nonce&, &int&, &mandatory&, [1]),
&sid&:Attribute(&sid&, &string&, &mandatory&),
&tt&:Attribute(&tt&, &boolean&, &mandatory&),
&ip&:Attribute(&ip&, &ipv4&, &mandatory&),
&test&:Attribute(&test&, &regex&, &mandatory&, r&^[1-9]{2,3}$&)
mandatory_flag: True, mandatory field should
False, will ignore 'level'.
All the parameters in the templates are included in the params with correct type and value.
(-1, invalid param)
Miss parameter in the params.
(-2, invalid param)
Parameter has invalid type or value.
(-9, None)
Exception happened
Attention:
Initial version
#########################################################
'''
global func_register
for tpl in templates.iteritems():
tpl_name = tpl[0]
tpl_attr = tpl[1]
if not params.has_key(tpl_name):
if mandatory_flag and tpl_attr.level == 'mandatory':
return (-1, tpl_name)
if func_register[tpl_attr.type](params[tpl_name], tpl_attr.value) == False:
return (-2, tpl_name)
except Exception as e:
return (-9, None)
return (0, None)
+def check_single_parameter(param_name, param_value, templates):
'''
#########################################################
Description:
This function is used to traverse templates and check all of them with params.
Parameters:
param_name
string, the name of parameter to be checked. E.g. &nonce&
param_value the real value of parameter to be checked. E.g. 123456
dictionary, parameters' templates, name:attribute(name, type, level, value)
the name of parameter
the type of parameter
Currently the following types supported:
the level of parameter
Currently the following levels supported:
the value list of parameters, it could contains one or more real values,
and could be omitted if you do not care the value
For string,
tuple (min length, max length)
[value1, value2]
attribute_templates = {
&nonce&:Attribute(&nonce&, &int&, &mandatory&, [1]),
&sid&:Attribute(&sid&, &string&, &mandatory&),
&tt&:Attribute(&tt&, &boolean&, &mandatory&),
&ip&:Attribute(&ip&, &ipv4&, &mandatory&),
&test&:Attribute(&test&, &regex&, &mandatory&, r&^[1-9]{2,3}$&)
Miss definition in the templates
Parameter has invalid type or value.
Attention:
Initial version
#########################################################
'''
global func_register
if not templates.has_key(param_name):
if func_register[templates[param_name].type](param_value, templates[param_name].value):
except Exception as e:
+if __name__ == &__main__&:
attribute_templates = {
&nonce&:Attribute(&nonce&, &int&, &mandatory&, range(1,3)),
&sid&:Attribute(&sid&, &string&, &mandatory&, [&abc&]),
&tt&:Attribute(&tt&, &boolean&, &mandatory&),
&ip&:Attribute(&ip&, &ipv4&, &mandatory&),
&test&:Attribute(&test&, &regex&, &mandatory&, r&^[1-9]{2,3}$&)
params = {&nonce&:1, &sid&:&abc&, &tt&:True, &ip&:&1.1.1.1&, &test&:&123&}
result, param_name = check_parameters(params, attribute_templates)
if result:
print &OK&
print &Er&
print param_name
\ No newline at end of file
Index: __init__.py
===================================================================
--- __init__.py (revision 0)
+++ __init__.py (revision 225)
@@ -0,0 +1,28 @@
+#! /usr/bin/python
+#coding=utf-8
+@version: ??
+@author: calmwu
+@contact:
+@software: PyCharm
+@file: __init__.py
+from gamecloud_base.utils.anti_hack import AntiHack
+from gamecloud_base.utils.check_parameter import CheckParameter
+from gamecloud_base.utils.daemonize import Daemon
+from gamecloud_base.utils.dc_report import DcReport
+from gamecloud_base.utils.enum import Enum
+from gamecloud_base.utils.inter_error import InterError
+from gamecloud_base.utils.http_client import HttpClient
+from gamecloud_base.utils.import_mod import import_and_get_module
+from gamecloud_base.utils.func_help import compose_inputargs
+from gamecloud_base.utils.func_help import parse_cookie_uin
+from gamecloud_base.utils.singleton import singleton
+from gamecloud_base.utils.parameter_check import *
Index: anti_hack.py
===================================================================
--- anti_hack.py (revision 0)
+++ anti_hack.py (revision 225)
@@ -0,0 +1,60 @@
+#! /usr/bin/python
+#coding=utf-8
+'''
+@version: ??
+@author: lippsu
+@contact:
+@software: PyCharm
+@file: anti_csrf
+'''
+def qzap_hash_time33(ag_str):
'''
* Time33 哈希算法
* @param string $str 待加密的文本,暂不支持中文
* @return int
'''
hash = 5381
for c in ag_str:
ascii_c = ord(c)
hash = int((hash&&5&0x7fffffff) + ascii_c +hash)
return hash & 0x7fffffff
+class AntiHack(object):
@staticmethod
def anti_csrf(skey, token):
'''
@skey: skey or pskey
@token: token from client
return: bool
'''
return (qzap_hash_time33(skey) == token)
@staticmethod
def remove_xss(val):
if not isinstance(val, basestring):
return val
val = val.replace('&', &&)
val = val.replace('&', &&)
val = val.replace('&', &&)
val = val.replace(&'&, &&)
return val
+if __name__ == &__main__&:
skey = '@lmVYUSzZ0'
g_tk = qzap_hash_time33(skey)
unicodestr = u'hehe呵呵'
print &%r& % unicodestr
xssunicode = AntiHack.remove_xss(unicodestr)
print &%r& % xssunicode
print g_tk
xss = &&stript&alert(\'xss\')&/stript&&
print AntiHack.remove_xss(xss) #striptalert(xss)/stript
\ No newline at end of file
Index: dc_report.py
===================================================================
--- dc_report.py (revision 0)
+++ dc_report.py (revision 225)
@@ -0,0 +1,195 @@
+#!/usr/bin/env python
+#encoding=utf-8
+#-------------------------------
+# dcapi for python version 1.0.0
+# test passed on python 2.6.6
cloudyang create
+# ------------------------------
+import os
+import time
+import platform
+import traceback
+from ctypes import *
+from ctypes import cdll
+from gamecloud_base.log_module import logM
+class CLogger :
systemBit = platform.architecture()
current_dir = os.path.dirname(__file__)
if systemBit[0] == '64bit':
lib_path = os.path.abspath(os.path.join(current_dir, &../lib/dcapi_lib/libdcapi_c_64.so&))
__dcapi = cdll.LoadLibrary(lib_path)
elif systemBit[0] == '32bit':
lib_path = os.path.abspath(os.path.join(current_dir, &../lib/dcapi_lib/libdcapi_c_32.so&))
__dcapi = cdll.LoadLibrary(lib_path)
# method constraint
__dcapi.clogger_getLastErr.argtypes = [ c_void_p ]
__dcapi.clogger_getLastErr.restype = c_char_p
__dcapi.clogger_create.restype = c_void_p
__dcapi.clogger_mcreate.restype = c_void_p
__dcapi.clogger_writeBaselog.argtypes = [ c_void_p, c_char_p, c_uint, c_ubyte, c_ubyte, c_ulong ]
__dcapi.clogger_writeBaselog.restype = c_int
__dcapi.clogger_writeModulelog.argtypes = [ c_void_p, c_int, c_int, c_char_p, c_char_p, c_char_p, c_int, c_int, c_int, c_int, c_int ]
__dcapi.clogger_writeModulelog.restype = c_int
__dcapi.clogger_destroy.argtypes = [ c_void_p ]
# sock type define
SOCK_UNIX = 0
SOCK_TCP = 1
SOCK_UDP = 2
# protocol type define
PRO_STRING = 0
PRO_BINARY = 1
def __init__(self, log_type='default'):
self.__log_type = log_type
def init(self, logname, socktype=SOCK_UNIX):
初始化非模调的clogger对象,一般用于向罗盘、监控系统上报数据
@param logname: 上报方用的唯一标识,由监控系统或罗盘进行分配
@param socktype: api跟agent的通信协议,详见上面类中的定义
@return: 成功返回0 ,失败返回 -1,并且可通过get_last_err获得失败原因
self.__dcobj = self.__dcapi.clogger_create(logname, socktype)
if not self.__dcobj:
def init_mod(self, modid):
初始化模调用的clogger对象(仅用于上报自研模调数据使用,其余上报统一用init初始化)
@param modid : 模块Id
@return : 成功返回0 ,失败返回 -1,并且可通过get_last_err获得失败原因
self.__dcobj = self.__dcapi.clogger_mcreate(modid)
if not self.__dcobj:
except Exception as e:
logM.error(traceback.format_exc(), app_name=self.__log_type)
def write_modulelog(self, sid, ifid, fip, mip, sip, retval, result, delay, req_len=0, rsp_len=0):
自研模调数据上报接口
数据上报后要经过半小时之后才可在监控平台查看到
@param modid: 主调模块ID,必填
@param sid: 被调模块ID,必填
@param ifid: 被调接口ID,必填
@param fip: 上报IP,必填
@param mip: 主调IP,必填
@param sip: 被调IP, 选填。注:最好填写,便于事后定位分析问题
@param retval: 返回值,选填 注:最好填写,便于事后定位分析问题
@param result: 返回类型 必填。注:0成功、1失败、2逻辑失败、3数据库异常、4网络异常、5 IO异常、6其他异常
@param delay: 调用耗时, 单位:ms
@param req_len: 请求包长度(可选)
@param rsp_len: 响应包长度(可选)
@return : 成功0,失败非0
ret = self.__dcapi.clogger_writeModulelog(self.__dcobj, sid, ifid, fip, mip, sip, retval, result, delay, req_len, rsp_len)
return ret
def get_last_err(self):
获取最近的一条错误信息
@return : 最新的错误信息
return self.__dcapi.clogger_getLastErr(self.__dcobj)
def __del__(self):
if self.__dcobj is not None :
self.__dcapi.clogger_destroy(self.__dcobj)
+class DcReport(object):
def __init__(self, log_type='default'):
self.__log_type = log_type
#self.clogger = CLogger()
#ret = self.clogger.init_mod(modid)
#0成功、1失败、2逻辑失败、3数据库异常、4网络异常、5 IO异常、6其他异常
FAILED = 1
LOGICAL_FAILED = 2
DATABASE_EXCEPTION = 3
NET_EXCEPTION = 4
IO_ERROR = 5
OTHER_EXCEPTION = 6
#def __del__(self):
#if self.clogger:
#del self.clogger
def report(self, modid, sid, ifid, fip, mip, sip, retval, result, delay, req_len=0, rsp_len=0):
自研模调数据上报接口
数据上报后要经过半小时之后才可在监控平台查看到
@param modid: 主调模块ID,必填
@param sid: 被调模块ID,必填
@param ifid: 被调接口ID,必填
@param fip: 上报IP,必填
@param mip: 主调IP,必填
@param sip: 被调IP, 选填。注:最好填写,便于事后定位分析问题
@param retval: 返回值,选填 注:最好填写,便于事后定位分析问题
@param result: 返回类型 必填。注:0成功、1失败、2逻辑失败、3数据库异常、4网络异常、5 IO异常、6其他异常
@param delay: 调用耗时, 单位:ms
@param req_len: 请求包长度(可选)
@param rsp_len: 响应包长度(可选)
@return : 成功0,失败非0
clogger = CLogger()
delay = int(delay)
ret = clogger.init_mod(modid)
if ret != 0:
logM.error(&module report init failed!\n&, app_name=self.__log_type)
ret = clogger.write_modulelog(sid, ifid, fip, mip, sip, retval, result, delay, req_len, rsp_len)
if ret != 0:
logM.error(&module_log report failed!\n&, app_name=self.__log_type)
except Exception as e:
logM.error(traceback.format_exc(), app_name=self.__log_type)
+def module_report(modid, sid, ifid, fip, mip, sip, retval, result, delay, req_len=0, rsp_len=0):
自研模调数据上报接口
数据上报后要经过半小时之后才可在监控平台查看到
@param modid: 主调模块ID,必填
@param sid: 被调模块ID,必填
@param ifid: 被调接口ID,必填
@param fip: 上报IP,必填
@param mip: 主调IP,必填
@param sip: 被调IP, 选填。注:最好填写,便于事后定位分析问题
@param retval: 返回值,选填 注:最好填写,便于事后定位分析问题
@param result: 返回类型 必填。注:0成功、1失败、2逻辑失败、3数据库异常、4网络异常、5 IO异常、6其他异常
@param delay: 调用耗时, 单位:ms
@param req_len: 请求包长度(可选)
@param rsp_len: 响应包长度(可选)
@return : 成功0,失败非0
mod_report = CLogger()
ret = mod_report.init_mod(modid)
if ret != 0:
#ogM.error(&module report init failed!\n&)
ret = mod_report.write_modulelog(sid, ifid, fip, mip, sip, retval, result, delay, req_len, rsp_len)
if ret != 0:
#logM.error(&module_log report failed!\n&)
except Exception as e:
logM.error(traceback.format_exc())
\ No newline at end of file
Index: singleton.py
===================================================================
--- singleton.py (revision 0)
+++ singleton.py (revision 225)
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+# encoding: utf-8
+@version: ??
+@author: calmwu
+@license: Apache Licence
+@contact:
+@file: singleton.py
+#/blog/singleton-pattern-in-python/
+#/questions/6760685/creating-a-singleton-in-python
+#这里第一个参数必须是cls
+def singleton(cls, *args, **kwargs):
instances = {}
def getinstance(*args, **kwargs):
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return getinstance
+# example
+if __name__ == '__main__':
@singleton
class Counter:
def __init__(self, count, num, k):
print &----------- input args:&, count, num, k
self.count = count
def inc(self):
self.count += 1
counter_1 = Counter(1,2,3)
counter_2 = Counter(8,6,3)
print id(counter_1)
print id(counter_2)
counter = singleton(Counter)(count=6, num=9, k=10)
counter1 = singleton(Counter)(1, 2, 3)
counter2 = singleton(Counter)(num=2, count=2, k=3)
counter3 = singleton(Counter)()
print counter == counter1 == counter2
print id(counter)
print id(counter1)
print id(counter2)
print id(counter3)
\ No newline at end of file
Index: func_help.py
===================================================================
--- func_help.py (revision 0)
+++ func_help.py (revision 225)
@@ -0,0 +1,44 @@
+#! /usr/bin/python
+#coding=utf-8
+@version: ??
+@author: calmwu
+@contact:
+@software: PyCharm
+@file: func_help.py
+import pprint
+import re
+#格式化局部变量
+def compose_inputargs(local_args):
if local_args.has_key('self'):
local_args.pop('self')
return pprint.pformat(local_args)
+def parse_cookie_uin(uin):
'''
@传入字符串 uin cookie
1. 去除非数字前缀和非数字的后缀
2. 然后取出前导0
非法uin,返回False
'''
# 获取字符串中第一个非0开头的子串
get_num_str = re.compile(r&\D*([1-9]\d*)&, re.I)
m = get_num_str.search(uin)
return False
real_uin = m.group(1) # group 0:整体匹配,group i:第i个括号的匹配
if not real_uin:
return False
return real_uin
except Exception as e:
return False
\ No newline at end of file
Index: enum.py
===================================================================
--- enum.py (revision 0)
+++ enum.py (revision 225)
@@ -0,0 +1,36 @@
+#! /usr/bin/python
+#coding=utf-8
+# Copyright (c) 2015, calmwu
+# All rights reserved.
+#python的enum实现
+import pprint
+def Enum(*seq, **named):
#print seq
#print named
#并行处理seq,生成一个对应的数字
map_seq = zip(seq, range(len(seq)))
#生成类的属性
class_attrs = dict(map_seq, **named)
#生成metaclass
return type('Enum', (), class_attrs)
+if __name__ == &__main__&:
ChannelType = Enum('CH_TYPE_PLAYBAR', 'CH_TYPE_MYAPP', CH_TYPE_BROWSER=10)
E_Fields_VoucherAppInfo = Enum(FIELD_CHANNEL_TYPE = 'channel_type',
FIELD_APPLY_TIME = 'apply_time',
FIELD_IS_ONQCLOUD = 'is_onqcloud',
FIELD_GAME_NAME = 'game_name',
FIELD_CHANNEL_APPLEVEL = 'channel_applevel',
FIELD_CHANNEL_PLAYBAR_APPLEVEL = 'channel_playbar_applevel',
FIELD_VOURCHER_COUNT = 'voucher_count')
E_Fidnfo = Enum(APPROVFAL_REJECTED=2, APPROVFAL_PASSED=3)
print type(t), t
print E_Fidnfo.APPROVFAL_REJECTED
Index: system.py
===================================================================
--- system.py (revision 0)
+++ system.py (revision 225)
@@ -0,0 +1,19 @@
+#encoding=utf-8
+import platform
+import socket
+def get_ip_address(ifname):
import fcntl
import struct
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
# SIOCGIFADDR
struct.pack('256s', ifname[:15])
except ImportError, e:
return &Windows&
+def get_current_sysname():
return platform.system()
\ No newline at end of file
Index: time_arithmetic.py
===================================================================
--- time_arithmetic.py (revision 0)
+++ time_arithmetic.py (revision 225)
@@ -0,0 +1,73 @@
+#! /usr/bin/python
+#coding=utf-8
+'''
+@version: ??
+@author: lippsu
+@contact:
+@software: PyCharm
+@file: time_arithmetic
+'''
+import time
+import calendar
+DAY_TIMESTAMP = 86400
+class TimeArithmetic(object):
@staticmethod
def delta_days(tmsp_a, tmsp_b):
'''(tm_a - tm_b) mdays'''
delta_tmsp = tmsp_a - tmsp_b
delta_days = int (delta_tmsp / DAY_TIMESTAMP)
return delta_days
@staticmethod
def is_delta_month(tmsp_a, tmsp_b):
'''(tm_a - tm_b) mday 相隔天数大于30天'''
delta_days = TimeArithmetic.delta_days(tmsp_a, tmsp_b)
if delta_days & 0:
return False
return (delta_days &= 30)
@staticmethod
def get_day(tmsp):
'''判断时间戳是月份中的那一号'''
ft = time.localtime(tmsp)
d = time.strftime('%d', ft)
return int(d)
@staticmethod
def is_ist_day(tmsp, ist):
'''判断时间戳是月份中的那一号'''
d = TimeArithmetic.get_day(tmsp)
return (d == ist)
@staticmethod
def day_pass_n_day(tmsp, n_day):
return tmsp + DAY_TIMESTAMP * n_day
@staticmethod
def timestamp_to_YMDHMS(tmsp):
tmlc = time.localtime(tmsp)
return time.strftime('%Y-%m-%d %H:%M:%S', tmlc)
@staticmethod
def add_months(date_time, months):
'''
date_time的基础上,新加n个月,
如果遇到日期溢出,则去小值
如8.31号+1月 = 9.30号(9.31不存在)
'''
month = date_time.month - 1 + months
year = date_time.year + month / 12
month = month % 12 + 1
day = min(date_time.day, calendar.monthrange(year, month)[1])
return date_time.replace(year=year, month=month, day=day)
@staticmethod
def get_now_stamps():
return time.time()
Index: file_manager.py
===================================================================
--- file_manager.py (revision 0)
+++ file_manager.py (revision 225)
@@ -0,0 +1,183 @@
+#! /usr/bin/python
+#coding=utf-8
+'''
+@version: ??
+@author: lippsu
+@contact:
+@software: PyCharm
+@file: file_namage
+'''
+# with as: /archives/325
+import os
+import shutil
+class FileManager(object):
'''utils:异常抛到逻辑层处理'''
def __init__(self):
@staticmethod
def dir_exist(path):
return os.path.exists(path)
@staticmethod
def path_is_file(path):
return os.path.isfile(path)
@staticmethod
def path_is_dir(path):
return os.path.isdir(path)
@staticmethod
def make_dir(path):
path = path.strip()
path = path.rstrip('\\')
isExists = os.path.exists(path)
if not isExists:
os.makedirs(path)
return True
return False
@staticmethod
def delete_dir(path):
'''删除目录以及子目录'''
if not FileManager.dir_exist(path):
shutil.rmtree(path)
@staticmethod
def delete_file(path):
if not FileManager.dir_exist(path):
os.remove(path)
@staticmethod
def delete_dir_sub(parent_path):
'''删除目录下所有的子目录和文件'''
for lists in os.listdir(parent_path):
path = os.path.join(parent_path, lists)
#print path
if os.path.isdir(path):
shutil.rmtree(path)
elif os.path.isfile(path):
os.remove(path)
@staticmethod
def get_dir_subpath(parent_path):
''''''
result = []
for lists in os.listdir(parent_path):
result.append(os.path.join(parent_path, lists))
return result
@staticmethod
def chmod(path, stats):
#stat: http://blog.csdn.net/chenyulancn/article/details/8229748
os.chmod(path,stats)
@staticmethod
def get_file_size(file_path):
'''
获取文件大小(b)
'''
return os.path.getsize(file_path)
@staticmethod
def get_file_ctime(path):
'''
获取文件或目录的创建时间
'''
return os.path.getctime(path)
@staticmethod
def get_file_stat(path):
'''
:return: nt.stat_result(st_mode=33206, st_ino=0L, st_dev=0L, st_nlink=0, st_uid=0, st_gid=0, st_size=6477392L, st_atime=L, st_mtime=L, st_ctime=L)
'''
return os.stat(path)
@staticmethod
def get_file_name(path):
'''
获取文件名
'''
if not FileManager.path_is_file(path):
return ''
list = path.split('/')
if len(list) & 0:
return list[-1]
return ''
@staticmethod
def get_file_suffix(path):
'''
获取文件的后缀名:
'a.b.c' ret 'c'
'abc' ret ''
'.a.b.c' ret 'c'
'''
file_name.split('.')[-1]
#return u'{0}{1}'.format('.',suffix)
list = path.split('.')
if len(list) & 1:
return list[-1]
return ''
@staticmethod
def write_chunk2file(file_path, chunk, offset):
'''
将chunk写入file的offset中,如果文件不存在,则创建该文件
'''
mode = None
if not os.path.isfile(file_path):
# 判断文件是否存在
mode = 'wb'
# 不存在:创建并写文件
mode = 'rb+' # 存在,则在原文件基础上编辑
with file(file_path, mode) as h_file:
h_file.seek(offset, 0)
h_file.write(chunk)
@staticmethod
def write_file(file_path, data):
'''
将chunk写入file,如果文件不存在,则创建该文件,如果文件存在,截断并重写
'''
with file(file_path, 'wb') as h_file:
h_file.write(data)
+if __name__ == '__main__':
#test_path = 'C:/Users/Administrator/Desktop/'
test_path = 'F:/tmp/'
sub_file_list = FileManager.get_dir_subpath(test_path)
for path in sub_file_list:
print path,FileManager.get_file_size(path), FileManager.get_file_ctime(path)
print FileManager.get_file_name(path)
print FileManager.get_file_suffix(path)
FileManager.write_chunk2file(test_path+'writechunk.txt','chunkoffset50',50)
FileManager.write_file(test_path+'writefile.txt','justwrite.')
tmp = test_path+'delete_test'
FileManager.make_dir(tmp)
tmp = test_path+'delete/ttt/ttt/ttt'
FileManager.make_dir(tmp)
tmp = test_path+'delete'
FileManager.delete_dir(tmp)
#tmp = test_path+'delete'
#FileManager.delete_dir_sub(tmp)
\ No newline at end of file
Index: import_mod.py
===================================================================
--- import_mod.py (revision 0)
+++ import_mod.py (revision 225)
@@ -0,0 +1,22 @@
+#! /usr/bin/python
+#coding=utf-8
+@version: ??
+@author: calmwu
+@contact:
+@software: PyCharm
+@file: import_mod
+#动态的导入模块 xxxx.yyy.zzzz 反馈zzz module
+# /questions/211100/pythons-import-doesnt-work-as-expected
+def import_and_get_module(module_name):
''''''
mod = __import__(module_name)
comp_names = module_name.split('.')
for comp_name in comp_names[1:]:
mod = getattr(mod, comp_name)
return mod
Index: inter_error.py
===================================================================
--- inter_error.py (revision 0)
+++ inter_error.py (revision 225)
@@ -0,0 +1,81 @@
+#! /usr/bin/python
+#coding=utf-8
+'''
+@version: ??
+@author: lippsu
+@contact:
+@software: PyCharm
+@file: inter_error
+'''
+class InterError(Exception):
'''系统错误码定义'''
# 参数 00-10
E_UNKNOW_ACTION
# Action is not outer interface or not exists
E_KEY_UNEXIST
# 缺少参数
E_PARAM_ERR
# 参数有误
E_INVALID_PARA
# 参数无效
E_UNKNOW_INTERFACE
# 参数接口名(cgw协议)
E_ACTION_FORBIDDEN
# 接口名调用的action受到限制
E_KEY_CONFLICT
# 主键已存在
# 鉴权 10-20
E_PERMISION_ERR
# 分项目&协作者
E_VERIFY_ERR
E_CSRF_ERR
# 接口 20-30
E_INTEFACE_ERR
# 接口错误
E_HTTP_ERR
# http调用错误
# 数据库调用错误
E_DB_KEY_CONFLICT
# 主键已存在冲突
E_DB_KEY_UNEXIST
# 查询关键字不存在
E_REDIS_INIT_ERR
# redis数据库初始化错误抛出的异常
# 配置 30-40
E_CONFIG_ERR
# 未找到配置项
E_NEED_OVER_WRITE
# 方法需要重写
E_LIMIT_EXCEEDED
# 用户自定义的资源条目超出限制:(自定义机型列表)
# 其他 90-99
E_EXCEPTION_ERR
# python 内核抛出的原生exception
E_UPLOAD_CHUNK_CRC
# CHUNK CRC NOT MACTH
E_UPLOAD_INVALID_APK = 1102
# parse apk failed
E_UPLOAD_INVALID_CHUNK = 1103
# 无效的chunk, NEED RETRY
E_UPLOAD_APK_NO_WETEST_SDK = 1104
# APK has not integer wetest sdk
E_UPLOAD_SIZE_LIMIT_EXCEEDED = 1105
#上传文件超出限制的大小
F_CAN_RETRY
def __init__(self, err_no=E_INTEFACE_ERR, ex_msg='', flag=F_NONE):
self.err_no = err_no
self.err_msg = self.parse_error_code(err_no) + ' ' + str(ex_msg)
self.flag = flag
def __str__(self):
return 'err_no(%r), err_msg(%r)' % (self.err_no,self.err_msg)
@classmethod
def parse_error_code(cls, err_no):
for key,value in cls.__dict__.items():
if err_no == value:
return key
return 'UNKNOWN_ERR_CODE'
+if __name__ == &__main__&:
#e = InterError()
print InterError.parse_error_code(InterError.E_LIMIT_EXCEEDED)
\ No newline at end of file
(C)&&2013&&Alibaba&&Inc.&&All&&rights&&resvered.
Powered by

我要回帖

更多关于 鲁能国际中心 的文章

 

随机推荐