amamanamam

データベースと仲良くなりたいです

接続要求を受けたときのメインスレッドの働きの話

クライアントから接続要求〜ユーザスレッド作成までのmysqldの御働きをバックトレースを見ながら探検する。 ついでに接続待機している部分にも寄り道する。 なお、スレッドキャッシュを空にするため、mysqldを再起動してから検証をする。

環境

mysql> select version();
+--------------+
| version()    |
+--------------+
| 8.0.28-debug |
+--------------+
1 row in set (0.00 sec)

接続要求が来るまでの間

  クライアントから接続要求がやって来る以前のmysqldのバックトレース

Connection_acceptor<Mysqld_socket_listener>::connection_event_loop(Connection_acceptor<Mysqld_socket_listener> * const this) (/mysql-8.0.28/sql/conn_handler/connection_acceptor.h:65)
mysqld_main(int argc, char ** argv) (/mysql-8.0.28/sql/mysqld.cc:7903)
main(int argc, char ** argv) (/mysql-8.0.28/sql/main.cc:25)

mysqldはクライアントから接続要求を受けるまでは接続待機をしている. 以下のconnection_event_loopでConnection_handler_managerというシングルトンの接続管理クラスを作成した後に、listen_for_connection_event で接続を待ち受ける。具体的にはpollがコールされる(タイムアウト値が-1になっていることから接続を受けるまで待機していることがわかる)

/**
    Connection acceptor loop to accept connections from clients.
  */
  void connection_event_loop() {
    Connection_handler_manager *mgr =
        Connection_handler_manager::get_instance();
    while (!connection_events_loop_aborted()) {
      Channel_info *channel_info = m_listener->listen_for_connection_event();
      if (channel_info != nullptr) mgr->process_new_connection(channel_info);
    }
  }
Channel_info *Mysqld_socket_listener::listen_for_connection_event() {
#ifdef HAVE_POLL
  int retval = poll(&m_poll_info.m_fds[0], m_socket_vector.size(), -1);
#else
  m_select_info.m_read_fds = m_select_info.m_client_fds;
  int retval = select((int)m_select_info.m_max_used_connection,
                      &m_select_info.m_read_fds, 0, 0, 0);
#endif
...

接続要求が来た後

適当なところでブレークポイント を置いて、接続要求があった際のデバックトレースが以下になる

my_thread_create(my_thread_handle * thread, const my_thread_attr_t * attr, my_start_routine func, void * arg) (/mysql-8.0.28/mysys/my_thread.cc:78)
pfs_spawn_thread_vc(PSI_thread_key key, PSI_thread_seqnum seqnum, my_thread_handle * thread, const my_thread_attr_t * attr, void *(*)(void *) start_routine, void * arg) (/mysql-8.0.28/storage/perfschema/pfs.cc:2998)
inline_mysql_thread_create(PSI_thread_key key, unsigned int sequence_number, my_thread_handle * thread, const my_thread_attr_t * attr, my_start_routine start_routine, void * arg) (/mysql-8.0.28/include/mysql/psi/mysql_thread.h:139)
Per_thread_connection_handler::add_connection(Per_thread_connection_handler * const this, Channel_info * channel_info) (/mysql-8.0.28/sql/conn_handler/connection_handler_per_thread.cc:411)
Connection_handler_manager::process_new_connection(Connection_handler_manager * const this, Channel_info * channel_info) (/mysql-8.0.28/sql/conn_handler/connection_handler_manager.cc:259)
Connection_acceptor<Mysqld_socket_listener>::connection_event_loop(Connection_acceptor<Mysqld_socket_listener> * const this) (/mysql-8.0.28/sql/conn_handler/connection_acceptor.h:65)
mysqld_main(int argc, char ** argv) (/mysql-8.0.28/sql/mysqld.cc:7903)
main(int argc, char ** argv) (/mysql-8.0.28/sql/main.cc:25)

Connection_handler_manager::process_new_connection

接続要求を受けた後はChannel_infoというクライアントからの接続情報が作成される(これはAbstractであり具体的にはChannel_info_tcpip_socketやChannel_info_local_socketなど)

そして、予め作成されていたConnection_handler_managerからprocess_new_connectionメソッドが呼び出される。max_connectionsを超えていないか等チェックしてadd_connectionを呼び出す(CONNECTION_ADMIN権限を持つユーザであればこのチェックはスルーされる)。connection_events_loop_abortedはよく分からない。

void Connection_handler_manager::process_new_connection(
    Channel_info *channel_info) {
  if (connection_events_loop_aborted() ||
      !check_and_incr_conn_count(channel_info->is_admin_connection())) {
    channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
    delete channel_info;
    return;
  }

  if (m_connection_handler->add_connection(channel_info)) {
    inc_aborted_connects();
    delete channel_info;
  }
}

なお、m_connection_handlerメンバの型はConnection_handlerという接続処理のAbstractクラスである。これはConnectionHandlerManagerインスタンス作成時のinitでthread_handlingの値を見て作成される。

今回の場合はthread_handling=one-thread-per-connectionなので具体的にはPer_thread_connection_handlerインスタンスとなる

bool Connection_handler_manager::init() {
  /*
    This is a static member function.
    Per_thread_connection_handler's static members need to be initialized
    even if One_thread_connection_handler is used instead.
  */
  Per_thread_connection_handler::init();

  Connection_handler *connection_handler = nullptr;
  switch (Connection_handler_manager::thread_handling) {
    case SCHEDULER_ONE_THREAD_PER_CONNECTION:
      connection_handler = new (std::nothrow) Per_thread_connection_handler();
      break;
    case SCHEDULER_NO_THREADS:
      connection_handler = new (std::nothrow) One_thread_connection_handler();
      break;
    default:
      assert(false);
  }
...

Per_thread_connection_handler::add_connection

add_connectionではcheck_idle_thread_and_enqueue_connectionが初めに呼び出される。ここではスレッドキャッシュの確認をしている。もしアイドル状態のスレッドがあればそれ利用する。今回の場合はmysqld再起動後に動かしているのでこの部分はスルーされる。

その後、mysql_thread_createを呼び出してスレッドの作成に取り掛かる。

bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
  int error = 0;
  my_thread_handle id;

  DBUG_TRACE;

  // Simulate thread creation for test case before we check thread cache
  DBUG_EXECUTE_IF("fail_thread_create", error = 1; goto handle_error;);

  if (!check_idle_thread_and_enqueue_connection(channel_info)) return false;

  /*
    There are no idle threads avaliable to take up the new
    connection. Create a new thread to handle the connection
  */
  channel_info->set_prior_thr_create_utime();
  error =
      mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
                          handle_connection, (void *)channel_info);
#ifndef NDEBUG
handle_error:
#endif  // !NDEBUG

  if (error) {
    connection_errors_internal++;
    if (!create_thd_err_log_throttle.log())
      LogErr(ERROR_LEVEL, ER_CONN_PER_THREAD_NO_THREAD, error);
    channel_info->send_error_and_close_channel(ER_CANT_CREATE_THREAD, error,
                                               true);
    Connection_handler_manager::dec_connection_count();
    return true;
  }

  Global_THD_manager::get_instance()->inc_thread_created();
  DBUG_PRINT("info", ("Thread created"));
  return false;
}

inline_mysql_thread_create以降

HAVE_PSI_THREAD_INTERFACEのマクロが定義されていればPSI_THREAD_CALLが呼び出される。デフォルトでは定義されておりCMakeオプションで制御される(DISABLE_PSI_THREAD)。

#define PSI_THREAD_CALL(M) psi_thread_service->M
...
...
static inline int inline_mysql_thread_create(
    PSI_thread_key key [[maybe_unused]],
    unsigned int sequence_number [[maybe_unused]], my_thread_handle *thread,
    const my_thread_attr_t *attr, my_start_routine start_routine, void *arg) {
  int result;
#ifdef HAVE_PSI_THREAD_INTERFACE
  result = PSI_THREAD_CALL(spawn_thread)(key, sequence_number, thread, attr,
                                         start_routine, arg);
#else
  result = my_thread_create(thread, attr, start_routine, arg);
#endif
  return result;
}

PSI_THREAD_CALLで呼び出されたメソッドでは、これから作成するスレッドに親スレッド(現在のスレッド)の情報を伝搬させるための準備をしている。具体的にはPFS_spawn_thread_argの型の変数にメインスレッドの計測情報を詰め込み、それをmy_thread_createに渡している。

/**
  Implementation of the thread instrumentation interface.
  @sa PSI_v2::spawn_thread.
*/
int pfs_spawn_thread_vc(PSI_thread_key key, PSI_thread_seqnum seqnum,
                        my_thread_handle *thread, const my_thread_attr_t *attr,
                        void *(*start_routine)(void *), void *arg) {
  PFS_spawn_thread_arg *psi_arg;
  PFS_thread *parent;

  /* psi_arg can not be global, and can not be a local variable. */
  psi_arg = (PFS_spawn_thread_arg *)my_malloc(
      PSI_NOT_INSTRUMENTED, sizeof(PFS_spawn_thread_arg), MYF(MY_WME));
  if (unlikely(psi_arg == nullptr)) {
    return EAGAIN;
  }

  psi_arg->m_child_key = key;
  psi_arg->m_child_seqnum = seqnum;
  psi_arg->m_child_identity = (arg ? arg : thread);
  psi_arg->m_user_start_routine = start_routine;
  psi_arg->m_user_arg = arg;

  parent = my_thread_get_THR_PFS();
  if (parent != nullptr) {
    /*
      Make a copy of the parent attributes.
      This is required, because instrumentation for this thread (the parent)
      may be destroyed before the child thread instrumentation is created.
    */
    psi_arg->m_thread_internal_id = parent->m_thread_internal_id;

    memcpy(psi_arg->m_username, parent->m_username,
           sizeof(psi_arg->m_username));
    psi_arg->m_username_length = parent->m_username_length;

    memcpy(psi_arg->m_hostname, parent->m_hostname,
           sizeof(psi_arg->m_hostname));
    psi_arg->m_hostname_length = parent->m_hostname_length;
  } else {
    psi_arg->m_thread_internal_id = 0;
    psi_arg->m_username_length = 0;
    psi_arg->m_hostname_length = 0;
  }

  int result = my_thread_create(thread, attr, pfs_spawn_thread, psi_arg);
  if (unlikely(result != 0)) {
    my_free(psi_arg);
  }
  return result;
}

最終的に呼び出されるmy_thread_createではpthread_createがコールされ、新しいスレッドが作成される。

int my_thread_create(my_thread_handle *thread, const my_thread_attr_t *attr,
                     my_start_routine func, void *arg) {
#ifndef _WIN32
  return pthread_create(&thread->thread, attr, func, arg);
#else
  struct thread_start_parameter *par;
  unsigned int stack_size;

  par = (struct thread_start_parameter *)malloc(sizeof(*par));
  if (!par) goto error_return;

  par->func = func;
  par->arg = arg;
  stack_size = attr ? attr->dwStackSize : 0;

  thread->handle =
      (HANDLE)_beginthreadex(NULL, stack_size, win_thread_start, par, 0,
                             (unsigned int *)&thread->thread);

  if (thread->handle) {
    /* Note that JOINABLE is default, so attr == NULL => JOINABLE. */
    if (attr && attr->detachstate == MY_THREAD_CREATE_DETACHED) {
      /*
        Close handles for detached threads right away to avoid leaking
        handles. For joinable threads we need the handle during
        my_thread_join. It will be closed there.
      */
      CloseHandle(thread->handle);
      thread->handle = NULL;
    }
    return 0;
  }

  my_osmaperr(GetLastError());
  free(par);

error_return:
  thread->thread = 0;
  thread->handle = NULL;
  return 1;
#endif
}