使用的方法:
r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx) r.xxxx()
有了ConnectionPool這個類之后,可以使用如下方法
pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx) r = redis.Redis(connection_pool=pool)
這里Redis是StrictRedis的子類
簡單分析如下:
在StrictRedis類的__init__方法中,可以初始化connection_pool這個參數(shù),其對應(yīng)的是一個ConnectionPool的對象:
class StrictRedis(object): ........ def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, ssl_ca_certs=None): if not connection_pool: .......... connection_pool = ConnectionPool(**kwargs) self.connection_pool = connection_pool
在StrictRedis的實例執(zhí)行具體的命令時會調(diào)用execute_command方法,這里可以看到具體實現(xiàn)是從連接池中獲取一個具體的連接,然后執(zhí)行命令,完成后釋放連接:
# COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): "Execute a command and return a parsed response" pool = self.connection_pool command_name = args[0] connection = pool.get_connection(command_name, **options) #調(diào)用ConnectionPool.get_connection方法獲取一個連接 try: connection.send_command(*args) #命令執(zhí)行,這里為Connection.send_command return self.parse_response(connection, command_name, **options) except (ConnectionError, TimeoutError) as e: connection.disconnect() if not connection.retry_on_timeout and isinstance(e, TimeoutError): raise connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: pool.release(connection) #調(diào)用ConnectionPool.release釋放連接
在來看看ConnectionPool類:
class ConnectionPool(object): ........... def __init__(self, connection_class=Connection, max_connections=None, **connection_kwargs): #類初始化時調(diào)用構(gòu)造函數(shù) max_connections = max_connections or 2 ** 31 if not isinstance(max_connections, (int, long)) or max_connections < 0: #判斷輸入的max_connections是否合法 raise ValueError('"max_connections" must be a positive integer') self.connection_class = connection_class #設(shè)置對應(yīng)的參數(shù) self.connection_kwargs = connection_kwargs self.max_connections = max_connections self.reset() #初始化ConnectionPool 時的reset操作 def reset(self): self.pid = os.getpid() self._created_connections = 0 #已經(jīng)創(chuàng)建的連接的計數(shù)器 self._available_connections = [] #聲明一個空的數(shù)組,用來存放可用的連接 self._in_use_connections = set() #聲明一個空的集合,用來存放已經(jīng)在用的連接 self._check_lock = threading.Lock() ....... def get_connection(self, command_name, *keys, **options): #在連接池中獲取連接的方法 "Get a connection from the pool" self._checkpid() try: connection = self._available_connections.pop() #獲取并刪除代表連接的元素,在第一次獲取connectiong時,因為_available_connections是一個空的數(shù)組, 會直接調(diào)用make_connection方法 except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) #向代表正在使用的連接的集合中添加元素 return connection def make_connection(self): #在_available_connections數(shù)組為空時獲取連接調(diào)用的方法 "Create a new connection" if self._created_connections >= self.max_connections: #判斷創(chuàng)建的連接是否已經(jīng)達到最大限制,max_connections可以通過參數(shù)初始化 raise ConnectionError("Too many connections") self._created_connections += 1 #把代表已經(jīng)創(chuàng)建的連接的數(shù)值+1 return self.connection_class(**self.connection_kwargs) #返回有效的連接,默認(rèn)為Connection(**self.connection_kwargs) def release(self, connection): #釋放連接,鏈接并沒有斷開,只是存在鏈接池中 "Releases the connection back to the pool" self._checkpid() if connection.pid != self.pid: return self._in_use_connections.remove(connection) #從集合中刪除元素 self._available_connections.append(connection) #并添加到_available_connections 的數(shù)組中 def disconnect(self): #斷開所有連接池中的鏈接 "Disconnects all connections in the pool" all_conns = chain(self._available_connections, self._in_use_connections) for connection in all_conns: connection.disconnect()
execute_command最終調(diào)用的是Connection.send_command方法,關(guān)閉鏈接為 Connection.disconnect方法,而Connection類的實現(xiàn):
class Connection(object): "Manages TCP communication to and from a Redis server" def __del__(self): #對象刪除時的操作,調(diào)用disconnect釋放連接 try: self.disconnect() except Exception: pass
核心的鏈接建立方法是通過socket模塊實現(xiàn):
def _connect(self): err = None for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TCP_KEEPALIVE if self.socket_keepalive: #構(gòu)造函數(shù)中默認(rèn) socket_keepalive=False,因此這里默認(rèn)為短連接 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in iteritems(self.socket_keepalive_options): sock.setsockopt(socket.SOL_TCP, k, v) # set the socket_connect_timeout before we connect sock.settimeout(self.socket_connect_timeout) #構(gòu)造函數(shù)中默認(rèn)socket_connect_timeout=None,即連接為blocking的模式 # connect sock.connect(socket_address) # set the socket_timeout now that we're connected sock.settimeout(self.socket_timeout) #構(gòu)造函數(shù)中默認(rèn)socket_timeout=None return sock except socket.error as _: err = _ if sock is not None: sock.close() .....
關(guān)閉鏈接的方法:
def disconnect(self): "Disconnects from the Redis server" self._parser.on_disconnect() if self._sock is None: return try: self._sock.shutdown(socket.SHUT_RDWR) #先shutdown再close self._sock.close() except socket.error: pass self._sock = None
可以小結(jié)如下
1)默認(rèn)情況下每創(chuàng)建一個Redis實例都會構(gòu)造出一個ConnectionPool實例,每一次訪問redis都會從這個連接池得到一個連接,操作完成后會把該連接放回連接池(連接并沒有釋放),可以構(gòu)造一個統(tǒng)一的ConnectionPool,在創(chuàng)建Redis實例時,可以將該ConnectionPool傳入,那么后續(xù)的操作會從給定的ConnectionPool獲得連接,不會再重復(fù)創(chuàng)建ConnectionPool。
2)默認(rèn)情況下沒有設(shè)置keepalive和timeout,建立的連接是blocking模式的短連接。
3)不考慮底層tcp的情況下,連接池中的連接會在ConnectionPool.disconnect中統(tǒng)一銷毀。
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com