#' Cox Proportional Hazards Regression #' #' @export coxp <- function(dataset, time, status, evar, int = "", check = "", form, data_filter = "", arr = "", rows = NULL, envir = parent.frame()) { if (!requireNamespace("survival", quietly = TRUE)) stop("survival package is required but not installed.") attachNamespace("survival") on.exit(detach("package:survival"), add = TRUE) ## ---- 公式入口 ---------------------------------------------------------- if (!missing(form)) { form <- as.formula(format(form)) vars <- all.vars(form) time <- vars[1] status<- vars[2] evar <- vars[-(1:2)] } ## ---- 基础检查 ---------------------------------------------------------- if (time %in% evar || status %in% evar) { return("Time/status variable contained in explanatory variables." %>% add_class("coxp")) } vars <- unique(c(time, status, evar)) df_name <- if (is_string(dataset)) dataset else deparse(substitute(dataset)) dataset <- get_data(dataset, vars, filt = data_filter, arr = arr, rows = rows, envir = envir) ## 状态变量检查与转换 surv_status <- dataset[[status]] if (!is.numeric(surv_status)) { ## 允许 0/1、FALSE/TRUE、factor(未事件/事件) 等常见编码 if (is.factor(surv_status) || is.character(surv_status)) { lv <- unique(surv_status) if (length(lv) != 2) { return("Status variable must be binary (0/1 or two levels)." %>% add_class("coxp")) } ## 统一成 0/1:按字母顺序或因子水平,第二个水平当作“事件=1” dataset[[status]] <- as.numeric(factor(surv_status, levels = lv)) - 1L } else { return("Status variable must be numeric 0/1 or binary factor." %>% add_class("coxp")) } } else { if (!all(unique(surv_status) %in% c(0, 1))) { return("Status variable must contain only 0 and 1." %>% add_class("coxp")) } } ## ---- 构造公式 ---------------------------------------------------------- if (missing(form)) { rhs <- if (length(evar) == 0) "1" else paste(evar, collapse = " + ") if (!is.empty(int)) rhs <- paste(rhs, paste(int, collapse = " + "), sep = " + ") form <- as.formula(paste("Surv(", time, ", ", status, ") ~ ", rhs)) } ## ---- 模型估计 ---------------------------------------------------------- if ("robust" %in% check) { model <- survival::coxph(form, data = dataset, robust = TRUE) } else { model <- survival::coxph(form, data = dataset) } ## 失败模型保护 if (inherits(model, "try-error")) { return("Model estimation failed. Check data separation or collinearity." %>% add_class("coxp")) } ## 基础摘要信息 coef_df <- broom::tidy(model, conf.int = TRUE) # 系数、HR、CI、p n <- nrow(dataset) # 样本量 n_event <- sum(dataset[[status]]) # 事件数 conc <- survival::survConcordance.fit(y = Surv(dataset[[time]], dataset[[status]]), x = predict(model, type = "lp"))$concordance cat("coef:", length(coef(model)), " n=", nrow(dataset), " events=", sum(dataset[[status]]), "\n") ## 打包返回 out <- as.list(environment()) out$model <- model out$df_name <- df_name out$type <- "survival" out$check <- check ## 附加对象 out$coef_df <- coef_df out$n <- n out$n_event <- n_event out$concordance <- conc add_class(out, c("coxp", "model")) } #' @export summary.coxp <- function(object, ...) { if (is.character(object)) return(object) # 检查模型对象有效性 if (!inherits(object$model, "coxph")) { cat("** Invalid Cox model object. **\n") return(invisible(object)) } # 输出基础信息 cat("Cox Proportional Hazards\n") cat("Data:", object$df_name, " N=", object$n, " Events=", object$n_event, "\n") cat("Concordance=", round(object$concordance, 3), "\n\n") # 输出模型summary summary(object$model) invisible(object) } #' @export predict.coxp <- function(object, pred_data = NULL, pred_cmd = "", dec = 3, envir = parent.frame(), ...) { if (is.character(object)) return(object) ## 构造预测数据框 if (is.null(pred_data)) { newdata <- envir$.model_frame # 若无新数据,默认用训练集 } else { newdata <- get_data(pred_data, envir = envir) } if (!is.empty(pred_cmd)) { newdata <- modify_data(newdata, pred_cmd, envir = envir) } ## 线性预测值 + HR lp <- predict(object$model, newdata = newdata, type = "lp") hr <- exp(lp) res <- data.frame(lp = round(lp, dec), hr = round(hr, dec)) attr(res, "pred_type") <- "linear predictor & hazard ratio" res } #' @export print.coxp.predict <- function(x, ..., n = 10) { cat("Cox PH predictions (linear predictor & hazard ratio):\n") print(head(x, n)) invisible(x) }