@@ -1683,14 +1683,7 @@ def protocol_downgrade(self, host_endpoint, previous_version):
16831683 "http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version" , self .protocol_version , new_version , host_endpoint )
16841684 self .protocol_version = new_version
16851685
1686- def _add_resolved_hosts (self ):
1687- for endpoint in self .endpoints_resolved :
1688- host , new = self .add_host (endpoint , signal = False )
1689- if new :
1690- host .set_up ()
1691- for listener in self .listeners :
1692- listener .on_add (host )
1693-
1686+ def _populate_hosts (self ):
16941687 self .profile_manager .populate (
16951688 weakref .proxy (self ), self .metadata .all_hosts ())
16961689 self .load_balancing_policy .populate (
@@ -1718,16 +1711,9 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
17181711 self .connection_class .initialize_reactor ()
17191712 _register_cluster_shutdown (self )
17201713
1721- self ._add_resolved_hosts ()
1722-
17231714 try :
17241715 self .control_connection .connect ()
1725-
1726- # we set all contact points up for connecting, but we won't infer state after this
1727- for endpoint in self .endpoints_resolved :
1728- h = self .metadata .get_host (endpoint )
1729- if h and self .profile_manager .distance (h ) == HostDistance .IGNORED :
1730- h .is_up = None
1716+ self ._populate_hosts ()
17311717
17321718 log .debug ("Control connection created" )
17331719 except Exception :
@@ -3535,20 +3521,18 @@ def _set_new_connection(self, conn):
35353521 log .debug ("[control connection] Closing old connection %r, replacing with %r" , old , conn )
35363522 old .close ()
35373523
3538- def _connect_host_in_lbp (self ):
3524+ def _try_connect_to_hosts (self ):
35393525 errors = {}
3540- lbp = (
3541- self ._cluster .load_balancing_policy
3542- if self ._cluster ._config_mode == _ConfigMode .LEGACY else
3543- self ._cluster ._default_load_balancing_policy
3544- )
35453526
3546- for host in lbp .make_query_plan ():
3527+ lbp = self ._cluster .load_balancing_policy \
3528+ if self ._cluster ._config_mode == _ConfigMode .LEGACY else self ._cluster ._default_load_balancing_policy
3529+
3530+ for endpoint in chain ((host .endpoint for host in lbp .make_query_plan ()), self ._cluster .endpoints_resolved ):
35473531 try :
3548- return (self ._try_connect (host . endpoint ), None )
3532+ return (self ._try_connect (endpoint ), None )
35493533 except Exception as exc :
3550- errors [str (host . endpoint )] = exc
3551- log .warning ("[control connection] Error connecting to %s:" , host , exc_info = True )
3534+ errors [str (endpoint )] = exc
3535+ log .warning ("[control connection] Error connecting to %s:" , endpoint , exc_info = True )
35523536 if self ._is_shutdown :
35533537 raise DriverException ("[control connection] Reconnection in progress during shutdown" )
35543538
@@ -3563,16 +3547,16 @@ def _reconnect_internal(self):
35633547 to the exception that was raised when an attempt was made to open
35643548 a connection to that host.
35653549 """
3566- (conn , _ ) = self ._connect_host_in_lbp ()
3550+ (conn , _ ) = self ._try_connect_to_hosts ()
35673551 if conn is not None :
35683552 return conn
35693553
35703554 # Try to re-resolve hostnames as a fallback when all hosts are unreachable
35713555 self ._cluster ._resolve_hostnames ()
35723556
3573- self ._cluster ._add_resolved_hosts ()
3557+ self ._cluster ._populate_hosts ()
35743558
3575- (conn , errors ) = self ._connect_host_in_lbp ()
3559+ (conn , errors ) = self ._try_connect_to_hosts ()
35763560 if conn is not None :
35773561 return conn
35783562
@@ -3813,68 +3797,8 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
38133797 cluster_name = local_row ["cluster_name" ]
38143798 self ._cluster .metadata .cluster_name = cluster_name
38153799
3816- partitioner = local_row .get ("partitioner" )
3817- tokens = local_row .get ("tokens" )
3818-
3819- host = self ._cluster .metadata .get_host (connection .original_endpoint )
3820- if host :
3821- datacenter = local_row .get ("data_center" )
3822- rack = local_row .get ("rack" )
3823- self ._update_location_info (host , datacenter , rack )
3824-
3825- # support the use case of connecting only with public address
3826- if isinstance (self ._cluster .endpoint_factory , SniEndPointFactory ):
3827- new_endpoint = self ._cluster .endpoint_factory .create (local_row )
3828-
3829- if new_endpoint .address :
3830- host .endpoint = new_endpoint
3831-
3832- host .host_id = local_row .get ("host_id" )
3833-
3834- found_host_ids .add (host .host_id )
3835- found_endpoints .add (host .endpoint )
3836-
3837- host .listen_address = local_row .get ("listen_address" )
3838- host .listen_port = local_row .get ("listen_port" )
3839- host .broadcast_address = _NodeInfo .get_broadcast_address (local_row )
3840- host .broadcast_port = _NodeInfo .get_broadcast_port (local_row )
3841-
3842- host .broadcast_rpc_address = _NodeInfo .get_broadcast_rpc_address (local_row )
3843- host .broadcast_rpc_port = _NodeInfo .get_broadcast_rpc_port (local_row )
3844- if host .broadcast_rpc_address is None :
3845- if self ._token_meta_enabled :
3846- # local rpc_address is not available, use the connection endpoint
3847- host .broadcast_rpc_address = connection .endpoint .address
3848- host .broadcast_rpc_port = connection .endpoint .port
3849- else :
3850- # local rpc_address has not been queried yet, try to fetch it
3851- # separately, which might fail because C* < 2.1.6 doesn't have rpc_address
3852- # in system.local. See CASSANDRA-9436.
3853- local_rpc_address_query = QueryMessage (
3854- query = maybe_add_timeout_to_query (self ._SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS , self ._metadata_request_timeout ),
3855- consistency_level = ConsistencyLevel .ONE )
3856- success , local_rpc_address_result = connection .wait_for_response (
3857- local_rpc_address_query , timeout = self ._timeout , fail_on_error = False )
3858- if success :
3859- row = dict_factory (
3860- local_rpc_address_result .column_names ,
3861- local_rpc_address_result .parsed_rows )
3862- host .broadcast_rpc_address = _NodeInfo .get_broadcast_rpc_address (row [0 ])
3863- host .broadcast_rpc_port = _NodeInfo .get_broadcast_rpc_port (row [0 ])
3864- else :
3865- host .broadcast_rpc_address = connection .endpoint .address
3866- host .broadcast_rpc_port = connection .endpoint .port
3867-
3868- host .release_version = local_row .get ("release_version" )
3869- host .dse_version = local_row .get ("dse_version" )
3870- host .dse_workload = local_row .get ("workload" )
3871- host .dse_workloads = local_row .get ("workloads" )
3872-
3873- if partitioner and tokens :
3874- token_map [host ] = tokens
3800+ peers_result .insert (0 , local_row )
38753801
3876- self ._cluster .metadata .update_host (host , old_endpoint = connection .endpoint )
3877- connection .original_endpoint = connection .endpoint = host .endpoint
38783802 # Check metadata.partitioner to see if we haven't built anything yet. If
38793803 # every node in the cluster was in the contact points, we won't discover
38803804 # any new nodes, so we need this additional check. (See PYTHON-90)
@@ -4173,8 +4097,9 @@ def _get_peers_query(self, peers_query_type, connection=None):
41734097 query_template = (self ._SELECT_SCHEMA_PEERS_TEMPLATE
41744098 if peers_query_type == self .PeersQueryType .PEERS_SCHEMA
41754099 else self ._SELECT_PEERS_NO_TOKENS_TEMPLATE )
4176- host_release_version = self ._cluster .metadata .get_host (connection .original_endpoint ).release_version
4177- host_dse_version = self ._cluster .metadata .get_host (connection .original_endpoint ).dse_version
4100+ original_endpoint_host = self ._cluster .metadata .get_host (connection .original_endpoint )
4101+ host_release_version = None if original_endpoint_host is None else original_endpoint_host .release_version
4102+ host_dse_version = None if original_endpoint_host is None else original_endpoint_host .dse_version
41784103 uses_native_address_query = (
41794104 host_dse_version and Version (host_dse_version ) >= self ._MINIMUM_NATIVE_ADDRESS_DSE_VERSION )
41804105
0 commit comments