当使用fastapi redis的时候出现, AttributeError: module ‘aioredis’ has no attribute ‘create_redis’
错误如下:
INFO: Waiting for application startup.
ERROR: Traceback (most recent call last):
File "D:\XXX\XXX\XXX\venv\lib\site-packages\starlette\routing.py", line 645, in lifespan
async with self.lifespan_context(app):
File "D:\XXX\XXX\XXX\venv\lib\site-packages\starlette\routing.py", line 540, in __aenter__
await self._router.startup()
File "D:\xxx\xxx\xxx\venv\lib\site-packages\starlette\routing.py", line 622, in startup
await handler()
File "D:\xxx\xxx\xxx\main.py", line 90, in startup_event
app.state.redis = await aioredis.create_redis(address=("127.0.0.1", 6379), db=1, encoding="utf-8")
AttributeError: module 'aioredis' has no attribute 'create_redis'
ERROR: Application startup failed. Exiting.
正常下用到了很多依赖,这些依赖不同版本之间有一些区别,可能他们开发时候比较早了,我最近按照指导配置的时候遇到了一些问题,有的是别的库bug,最近才修复,这里记录一下。
解决方法一 降低版本
方法 aioredis.create_redis
从版本2.0开始不再支持,所以需要降低版本:pip install 'aioredis<2.0'
解决方法二 (运行系统代码摘抄,可能有地方需要微调)
conf.py
# -*- coding: utf-8 -*-
# Redis
REDIS_HOST= '127.0.0.1'
REDIS_PORT= 6379
REDIS_PASSWORD = ''
REDIS_DATABASE = 0
REDIS_TIMEOUT = 5
redis.py
import sys
from aioredis import Redis, TimeoutError, AuthenticationError
from .conf import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_DATABASE, REDIS_TIMEOUT
class RedisCli(Redis):
def __init__(self):
super(RedisCli, self).__init__(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
db=REDIS_DATABASE,
socket_timeout=REDIS_TIMEOUT,
decode_responses=True # 转码 utf-8
)
async def init_redis_connect(self):
"""
触发初始化连接
:return:
"""
try:
await self.ping()
except TimeoutError:
print("连接redis超时")
sys.exit()
except AuthenticationError:
print("连接redis认证失败")
sys.exit()
except Exception as e:
print('连接redis异常 {}', e)
sys.exit()
# 创建redis连接对象
redis_client = RedisCli()
三、应用
from fastapi import APIRouter
from .redis import redis_client
class Response200(__ResponseBase):
...
redis = APIRouter()
@redis.post('')
async def test_redis():
result = await redis_client.set('test', 'test')
if result:
return Response200(data=result)
@redis.get('')
async def get_redis():
result = await redis_client.get('test')
if result:
return Response200(data=result)
@redis.delete('')
async def test_redis():
result = await redis_client.delete('test')
if result:
return Response200(data=result)
主题授权提示:请在后台主题设置-主题授权-激活主题的正版授权,授权购买:RiTheme官网
声明:本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。