1、涉及aiomysql模块,在MetaPathFinder.find_module中只需要处理aiomysql模块。
其他先忽略,然后确定需要替换aiomysql的功能。从业务上来说,一般我们只需要cursor.execute、cursor.fetchone、cursor.fetchall、cursor.executemany这些主要操作。
2、先cursor.execute的源代码(其他同理),调用self.nextset的方法。
完成上一个请求的数据,然后合并sql语句,最后通过self._query查询。
实例
importimportlib importtime importsys fromfunctoolsimportwraps fromtypingimportcast,Any,Callable,Optional,Tuple,TYPE_CHECKING fromtypesimportModuleType ifTYPE_CHECKING: importaiomysql deffunc_wrapper(func:Callable): @wraps(func) asyncdefwrapper(*args,**kwargs)->Any: start:float=time.time() func_result:Any=awaitfunc(*args,**kwargs) end:float=time.time() #根据_query可以知道,第一格参数是self,第二个参数是sql self:aiomysql.Cursor=args[0] sql:str=args[1] #通过self,我们可以拿到其他的数据 db:str=self._connection.db user:str=self._connection.user host:str=self._connection.host port:str=self._connection.port execute_result:Tuple[Tuple]=self._rows #可以根据自己定义的agent把数据发送到指定的平台,然后我们就可以在平台上看到对应的数据或进行监控了, #这里只是打印一部分数据出来 print({ "sql":sql, "db":db, "user":user, "host":host, "port":port, "result":execute_result, "speedtime":end-start }) returnfunc_result returncast(Callable,wrapper) classMetaPathFinder: @staticmethod deffind_module(fullname:str,path:Optional[str]=None)->Optional["MetaPathLoader"]: iffullname=='aiomysql': #只有aiomysql才进行hook returnMetaPathLoader() else: returnNone classMetaPathLoader: @staticmethod defload_module(fullname:str): iffullnameinsys.modules: returnsys.modules[fullname] #防止递归调用 finder:"MetaPathFinder"=sys.meta_path.pop(0) #导入module module:ModuleType=importlib.import_module(fullname) #针对_query进行hook module.Cursor._query=func_wrapper(module.Cursor._query) sys.meta_path.insert(0,finder) returnmodule asyncdeftest_mysql()->None: importaiomysql pool:aiomysql.Pool=awaitaiomysql.create_pool( host='127.0.0.1',port=3306,user='root',password='123123',db='mysql' ) asyncwithpool.acquire()asconn: asyncwithconn.cursor()ascur: awaitcur.execute("SELECT42;") (r,)=awaitcur.fetchone() assertr==42 pool.close() awaitpool.wait_closed() if__name__=='__main__': sys.meta_path.insert(0,MetaPathFinder()) importasyncio asyncio.run(test_mysql()) #输出示例: #可以看出sql语句与我们输入的一样,db,user,host,port等参数也是,还能知道执行的结果和运行时间 #{'sql':'SELECT42;','db':'mysql','user':'root','host':'127.0.0.1','port':3306,'result':((42,),),'speedtime':0.00045609474182128906}
以上就是python制作探针模块的方法,希望对大家有所帮助。更多Python学习指路:Python基础教程