场景:
一个系统的数据同步出了问题,跟源系统的数据存在不一致情况,需要对无效的数据做标记删除。但是,生产数据库对办公网络隔离,只能通过生产服务器做操作。于是乎写了这么一个cmd的程序。
直接上代码:
import cn.hutool.core.io.FileUtil;
import cn.hutool.db.Db;
import cn.hutool.db.Entity;
import cn.hutool.db.ds.simple.SimpleDataSource;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CmdToolDbCtToBatchSql {
public static class ConstName {
public static final String CT_TABLE_NAME = "USER_INFORMATION";
public static final String UC_MAIN_TABLE_NAME = "ag_tag_customer_info";
public static final String UC_UER_TABLE_NAME = "ag_account_info_";
}
public static class CtUserInfoDataContainer {
private Map<Long, CtUserInfoDataItem> map;
public CtUserInfoDataContainer() {
map = new HashMap<>();
}
public void addItem(CtUserInfoDataItem item) {
if (!map.containsKey(item.getUserId())) {
map.put(item.getUserId(), item);
}
}
public CtUserInfoDataItem findItem(String userId) {
if (userId == null || userId.isEmpty()) {
return null;
}
try {
long curUserId = Long.parseLong(userId, 10);
if (!map.containsKey(curUserId)) {
return null;
}
return map.get(curUserId);
} catch (Exception err) {
err.printStackTrace();
return null;
}
}
}
public static class CtUserInfoDataItem {
private int rowNum;
private long groupId;
private long userId;
private String delFlag;
private String userState;
public int getRowNum() {
return rowNum;
}
public void setRowNum(int rowNum) {
this.rowNum = rowNum;
}
public long getGroupId() {
return groupId;
}
public void setGroupId(long groupId) {
this.groupId = groupId;
}
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
public String getDelFlag() {
return delFlag;
}
public void setDelFlag(String delFlag) {
this.delFlag = delFlag;
}
public String getUserState() {
return userState;
}
public void setUserState(String userState) {
this.userState = userState;
}
public static CtUserInfoDataItem createItem(String input) {
if (input == null || input.isEmpty()) {
return null;
}
String[] sA = input.split(",");
CtUserInfoDataItem result = new CtUserInfoDataItem();
int idx = 0;
if (idx < sA.length) result.setRowNum(Integer.parseInt(sA[idx++], 10));
if (idx < sA.length) result.setGroupId(Long.parseLong(sA[idx++], 10));
if (idx < sA.length) result.setUserId(Long.parseLong(sA[idx++], 10));
if (idx < sA.length) result.setDelFlag(sA[idx++]);
if (idx < sA.length) result.setUserState(sA[idx++]);
return result;
}
}
public static class UcUserInfoDataItem {
private long pkId;
private String accountInfoGuid;
private String syncId;
private String dataStatus;
private String delFlag;
private String userState;
public long getPkId() {
return pkId;
}
public void setPkId(long pValue) {
this.pkId = pValue;
}
public String getAccountInfoGuid() {
return accountInfoGuid;
}
public void setAccountInfoGuid(String pValue) {
this.accountInfoGuid = pValue;
}
public String getSyncId() {
return syncId;
}
public void setSyncId(String pValue) {
this.syncId = pValue;
}
public String getDataStatus() {
return dataStatus;
}
public void setDataStatus(String pValue) {
this.dataStatus = pValue;
}
public String getDelFlag() {
return delFlag;
}
public void setDelFlag(String pValue) {
this.delFlag = pValue;
}
public String getUserState() {
return userState;
}
public void setUserState(String pValue) {
this.userState = pValue;
}
}
public static class UpdateDataItem {
private String dataStatus;
private String delFlag;
private String userState;
private List<UcUserInfoDataItem> list;
public String getDataStatus() {
return dataStatus;
}
public void setDataStatus(String dataStatus) {
this.dataStatus = dataStatus;
}
public String getDelFlag() {
return delFlag;
}
public void setDelFlag(String delFlag) {
this.delFlag = delFlag;
}
public String getUserState() {
return userState;
}
public void setUserState(String userState) {
this.userState = userState;
}
public List<UcUserInfoDataItem> getList() {
return list;
}
public void setList(List<UcUserInfoDataItem> list) {
this.list = list;
}
}
public static String strTrim(String data) {
return data == null ? "" : data.trim();
}
public static class UpdateDataContainer {
private String tableSharding;
private String mainInfoGuid;
private List<UpdateDataItem> itemList;
public static UpdateDataContainer createItem(String pTableSharding, String pMainInfoGuid) {
UpdateDataContainer result = new UpdateDataContainer();
result.setTableSharding(pTableSharding);
result.setMainInfoGuid(pMainInfoGuid);
result.setItemList(new ArrayList<>());
return result;
}
public UpdateDataItem findOrCreate(String dataStatus, String delFlag, String userState) {
for (UpdateDataItem each : getItemList()) {
if (each.getDataStatus().equals(dataStatus) &&
each.getDelFlag().equals(delFlag) &&
each.getUserState().equals(userState)) {
return each;
}
}
UpdateDataItem item = new UpdateDataItem();
item.setDataStatus(strTrim(dataStatus));
item.setDelFlag(strTrim(delFlag));
item.setUserState(strTrim(userState));
item.setList(new ArrayList<>());
getItemList().add(item);
return item;
}
public String getTableSharding() {
return tableSharding;
}
public void setTableSharding(String tableSharding) {
this.tableSharding = tableSharding;
}
public String getMainInfoGuid() {
return mainInfoGuid;
}
public void setMainInfoGuid(String mainInfoGuid) {
this.mainInfoGuid = mainInfoGuid;
}
public List<UpdateDataItem> getItemList() {
return itemList;
}
public void setItemList(List<UpdateDataItem> itemList) {
this.itemList = itemList;
}
}
protected static String rootDir() {
return "/run";
}
protected static String javacCmd() {
return "/app/jdk-11.0.11/bin/javac -cp .:hutool-all-5.8.38.jar:mysql-connector-j-8.0.31.jar CmdToolDbCtToSql.java";
}
protected static String javaCmd() {
return "/app/jdk-11.0.11/bin/java -cp .:hutool-all-5.8.38.jar:mysql-connector-j-8.0.31.jar CmdToolDbCtToSql";
}
protected static DataSource createDataSourceForCt(String pwd) {
String url = "jdbc:mysql://app.one.com:3306/one";
String user = "oneUser";
return new SimpleDataSource(url, user, pwd, "com.mysql.cj.jdbc.Driver");
}
protected static DataSource createDataSourceForUc(String pwd) {
String url = "jdbc:mysql://app.two.com:3306/two";
String user = "twoUser";
return new SimpleDataSource(url, user, pwd, "com.mysql.cj.jdbc.Driver");
}
protected static String getMainSyncIdStrList() {
return "1\n" +
"29991\n" +
"29992\n";
}
protected static CtUserInfoDataContainer createCtUserInfoDataContainer(String mainSyncId, DataSource dataSourceCt) throws Exception {
CtUserInfoDataContainer result = new CtUserInfoDataContainer();
long gtMinId = 0L;
int maxNum = Integer.MAX_VALUE;
int addNum = 0;
String runSql = null;
int idx = 0;
long vId = 0L;
long userId = 0L;
String delFlag = "";
String userState = "";
CtUserInfoDataItem ctUserInfoDataItemV = null;
String rn = "" + '\n';
while (true) {
++idx;
runSql = "select ID,USER_ID,DEL_FLAG,USER_STATE from " + ConstName.CT_TABLE_NAME + " where \n" +
" USER_ID > " + gtMinId + "\n" +
" AND ID = " + mainSyncId + "\n" +
" order by USER_ID ASC\n" +
" LIMIT 50000";
List<Entity> dataList = Db.use(dataSourceCt).query(runSql);
if (dataList.isEmpty()) {
break;
}
for (Entity each : dataList) {
++addNum;
vId = Long.parseLong(each.getStr("ID"));
userId = Long.parseLong(each.getStr("USER_ID"));
delFlag = each.getStr("DEL_FLAG");
userState = each.getStr("USER_STATE");
if (gtMinId < userId) {
gtMinId = userId;
}
ctUserInfoDataItemV = new CtUserInfoDataItem();
ctUserInfoDataItemV.setRowNum(addNum);
ctUserInfoDataItemV.setUserId(userId);
ctUserInfoDataItemV.setGroupId(vId);
ctUserInfoDataItemV.setDelFlag(delFlag);
ctUserInfoDataItemV.setUserState(userState);
result.addItem(ctUserInfoDataItemV);
if (addNum >= maxNum) {
break;
}
}
if (addNum >= maxNum) {
break;
}
}
return result;
}
protected static String getMainInfoGuid(String mainSyncId, DataSource dataSourceUc) throws Exception {
String runSql = "select main_info_guid from " + ConstName.UC_MAIN_TABLE_NAME + " \n" +
"where \n" +
"sync_id = " + mainSyncId +
" LIMIT 1";
List<Entity> dataList = Db.use(dataSourceUc).query(runSql);
if (dataList.isEmpty()) {
return "";
}
for (Entity each : dataList) {
return each.getStr("main_info_guid");
}
return "";
}
protected static String toTableSharding(String guid) {
int length = guid.length();
if (length < 2) {
return "0";
}
String str = guid.substring(length - 2);
return Integer.toString(Integer.parseInt(str, 10));
}
protected static String toUcSql(String tableSharding, String mainInfoGuid, long gtMinId) {
return "select id,account_info_guid,sync_id,data_status,delete_flag, user_state from " + ConstName.UC_UER_TABLE_NAME + tableSharding + "\n" +
"where \n" +
"ct_main_info_guid = '" + mainInfoGuid + "'\n" +
"and id > " + gtMinId + "\n" +
"order by id asc LIMIT 10000";
}
protected static String toUcCountSql(String tableSharding, String mainInfoGuid) {
return "select count(*) as data_count from " + ConstName.UC_UER_TABLE_NAME + tableSharding + "\n" +
"where \n" +
"ct_main_info_guid = '" + mainInfoGuid + "'";
}
protected static int ucCountSql(String tableSharding, String mainInfoGuid, DataSource dataSourceUc) throws Exception {
if (mainInfoGuid == null || mainInfoGuid.isEmpty()) {
return 0;
}
String runSql = toUcCountSql(tableSharding, mainInfoGuid);
List<Entity> dataList = Db.use(dataSourceUc).query(runSql);
if (dataList.isEmpty()) {
return 0;
}
for (Entity each : dataList) {
return Integer.parseInt(each.getStr("data_count"), 10);
}
return 0;
}
protected static String ucUserInfoDataItemShow(String mainSyncId, UcUserInfoDataItem ucUserInfoDataItemV) {
return "mainSyncId:" + mainSyncId + "|syncId:" + ucUserInfoDataItemV.getSyncId() + "|id:" + ucUserInfoDataItemV.getPkId() + "|" + ucUserInfoDataItemV.getAccountInfoGuid();
}
protected static String updateAgAccountInfo(String tableSharding, String mainInfoGuid, UcUserInfoDataItem ucUserInfoDataItemV, String dataStatus, String deleteFlag, String userState) {
return "update " + ConstName.UC_UER_TABLE_NAME + tableSharding + "\n" +
"SET\n" +
"data_status = '" + dataStatus + "',\n" +
"delete_flag = " + deleteFlag + ",\n" +
"user_state = " + userState + ",\n" +
"update_user = 'dev',\n" +
"update_time = now()\n" +
"where \n" +
"id = " + ucUserInfoDataItemV.getPkId() + "\n" +
"and ct_main_info_guid = '" + mainInfoGuid + "'\n" +
"and account_info_guid = '" + ucUserInfoDataItemV.getAccountInfoGuid() + "';";
}
protected static boolean dataIsOk(CtUserInfoDataItem ctUserInfoDataItemV) {
return "0".equalsIgnoreCase(ctUserInfoDataItemV.getDelFlag()) &&
"2".equalsIgnoreCase(ctUserInfoDataItemV.getUserState());
}
protected static String checkData(String mainSyncId, String tableSharding, String mainInfoGuid, CtUserInfoDataContainer ctUserInfoDataContainerV, UcUserInfoDataItem ucUserInfoDataItemV) {
CtUserInfoDataItem ctUserInfoDataItemV = ctUserInfoDataContainerV.findItem(ucUserInfoDataItemV.getSyncId());
StringBuilder result = new StringBuilder();
String rn = "" + '\n';
if (ctUserInfoDataItemV == null) {
if ("check-delete".equals(ucUserInfoDataItemV.getDataStatus())) {
return "";
}
result.append("-- more.data: " + ucUserInfoDataItemShow(mainSyncId, ucUserInfoDataItemV) + " --" + rn);
result.append(updateAgAccountInfo(tableSharding, mainInfoGuid, ucUserInfoDataItemV, "check-delete", ucUserInfoDataItemV.getDelFlag(), ucUserInfoDataItemV.getUserState()) + rn);
return result.toString();
}
boolean dataIsOkFlag = dataIsOk(ctUserInfoDataItemV);
String dataStatus = dataIsOkFlag ? "normal" : "disabled";
if ("check-delete".equals(ucUserInfoDataItemV.getDataStatus())) {
return "";
}
if (!dataStatus.equals(ucUserInfoDataItemV.getDataStatus()) ||
ucUserInfoDataItemV.getDelFlag() == null ||
!ucUserInfoDataItemV.getDelFlag().equals(ctUserInfoDataItemV.getDelFlag()) ||
ucUserInfoDataItemV.getUserState() == null ||
!ucUserInfoDataItemV.getUserState().equals(ctUserInfoDataItemV.getUserState())) {
result.append("-- update.data: " + ucUserInfoDataItemShow(mainSyncId, ucUserInfoDataItemV) + " --" + rn);
result.append(updateAgAccountInfo(tableSharding, mainInfoGuid, ucUserInfoDataItemV, dataStatus, ctUserInfoDataItemV.getDelFlag(), ctUserInfoDataItemV.getUserState()) + rn);
return result.toString();
}
return "";
}
protected static UcUserInfoDataItem checkDataForContainer(UpdateDataContainer updateDataContainerV, String mainSyncId, String tableSharding, String mainInfoGuid, CtUserInfoDataContainer ctUserInfoDataContainerV, UcUserInfoDataItem ucUserInfoDataItemV) {
if ("check-delete".equals(ucUserInfoDataItemV.getDataStatus())) {
return null;
}
CtUserInfoDataItem ctUserInfoDataItemV = ctUserInfoDataContainerV.findItem(ucUserInfoDataItemV.getSyncId());
UpdateDataItem updateDataItemV = null;
if (ctUserInfoDataItemV == null) {
updateDataItemV = updateDataContainerV.findOrCreate("check-delete", ucUserInfoDataItemV.getDelFlag(), ucUserInfoDataItemV.getUserState());
updateDataItemV.getList().add(ucUserInfoDataItemV);
return ucUserInfoDataItemV;
}
boolean dataIsOkFlag = dataIsOk(ctUserInfoDataItemV);
String dataStatus = dataIsOkFlag ? "normal" : "disabled";
if (!dataStatus.equals(ucUserInfoDataItemV.getDataStatus()) ||
ucUserInfoDataItemV.getDelFlag() == null ||
!ucUserInfoDataItemV.getDelFlag().equals(ctUserInfoDataItemV.getDelFlag()) ||
ucUserInfoDataItemV.getUserState() == null ||
!ucUserInfoDataItemV.getUserState().equals(ctUserInfoDataItemV.getUserState())) {
updateDataItemV = updateDataContainerV.findOrCreate(dataStatus, ctUserInfoDataItemV.getDelFlag(), ctUserInfoDataItemV.getUserState());
updateDataItemV.getList().add(ucUserInfoDataItemV);
return ucUserInfoDataItemV;
}
return null;
}
protected static String convertUserState(String data) {
if (data == null || data.trim().isEmpty()) {
return "1";
}
return data;
}
protected static String convertDeleteFlag(String data) {
if (data == null || data.trim().isEmpty()) {
return "1";
}
return data;
}
protected static String updateDataItemToSql(UpdateDataContainer updateDataContainerV, UpdateDataItem updateDataItemV, String idInSql, String guidInSql) {
return "update " + ConstName.UC_UER_TABLE_NAME + updateDataContainerV.getTableSharding() + "\n" +
"SET\n" +
"data_status = '" + updateDataItemV.getDataStatus() + "',\n" +
"delete_flag = " + convertDeleteFlag(updateDataItemV.getDelFlag()) + ",\n" +
"user_state = " + convertUserState(updateDataItemV.getUserState()) + ",\n" +
"update_user = 'dev',\n" +
"update_time = now()\n" +
"where \n" +
"id in ( " + idInSql + ")\n" +
"and ct_main_info_guid = '" + updateDataContainerV.getMainInfoGuid() + "'\n" +
"and account_info_guid in (" + guidInSql + ");";
}
protected static String updateDataContainerToSql(int dataCount, UpdateDataContainer updateDataContainerV) {
StringBuilder result = new StringBuilder();
int addNum = 0;
StringBuilder idSb = null;
StringBuilder guidSb = null;
int sqlIndex = 0;
String rn = "" + '\n';
for (UpdateDataItem dataItemEach : updateDataContainerV.getItemList()) {
addNum = 0;
for (UcUserInfoDataItem each : dataItemEach.getList()) {
if (addNum == 0) {
idSb = new StringBuilder();
guidSb = new StringBuilder();
} else {
idSb.append(",");
guidSb.append(",");
}
++addNum;
idSb.append(each.getPkId());
guidSb.append("'" + each.getAccountInfoGuid() + "'");
if (addNum >= 900) {
addNum = 0;
++sqlIndex;
result.append("-- dataCount:" + dataCount + "|sqlIndex:" + sqlIndex + " --" + rn);
result.append(updateDataItemToSql(updateDataContainerV, dataItemEach, idSb.toString(), guidSb.toString()));
result.append(rn);
idSb = null;
guidSb = null;
}
}
if (addNum > 0) {
addNum = 0;
++sqlIndex;
result.append("-- dataCount:" + dataCount + "|sqlIndex:" + sqlIndex + " --" + rn);
result.append(updateDataItemToSql(updateDataContainerV, dataItemEach, idSb.toString(), guidSb.toString()));
result.append(rn);
idSb = null;
guidSb = null;
}
}
return result.toString();
}
protected static int runForMainSyncId(int dataCount, String filePath, String mainSyncId, DataSource dataSourceCt, DataSource dataSourceUc) throws Exception {
CtUserInfoDataContainer ctUserInfoDataContainerV = createCtUserInfoDataContainer(mainSyncId, dataSourceCt);
String mainInfoGuid = getMainInfoGuid(mainSyncId, dataSourceUc);
String tableSharding = toTableSharding(mainInfoGuid);
int countData = ucCountSql(tableSharding, mainInfoGuid, dataSourceUc);
System.out.println("-- 当前处理记录数:dataCount:" + dataCount);
System.out.println("-- CT侧:" + mainSyncId + ":" + ctUserInfoDataContainerV.map.size());
System.out.println("-- UC侧:mainInfoGuid:" + mainInfoGuid + ":" + countData);
UpdateDataContainer updateDataContainerV = UpdateDataContainer.createItem(tableSharding, mainInfoGuid);
long gtMinId = 0L;
int maxNum = Integer.MAX_VALUE;
int addNum = 0;
String runSql = null;
UcUserInfoDataItem ucUserInfoDataItemV = null;
String rn = "" + '\n';
StringBuilder addSb = new StringBuilder();
addSb.append("-- " + dataCount + ":" + mainInfoGuid + ":" + mainSyncId + " -- " + rn + rn);
while (true) {
runSql = toUcSql(tableSharding, mainInfoGuid, gtMinId);
List<Entity> dataList = Db.use(dataSourceUc).query(runSql);
if (dataList.isEmpty()) {
break;
}
for (Entity each : dataList) {
++addNum;
ucUserInfoDataItemV = new UcUserInfoDataItem();
ucUserInfoDataItemV.setPkId(Long.parseLong(each.getStr("id")));
ucUserInfoDataItemV.setSyncId(each.getStr("sync_id"));
ucUserInfoDataItemV.setAccountInfoGuid(each.getStr("account_info_guid"));
ucUserInfoDataItemV.setDataStatus(each.getStr("data_status"));
ucUserInfoDataItemV.setDelFlag(each.getStr("delete_flag"));
ucUserInfoDataItemV.setUserState(each.getStr("user_state"));
if (gtMinId < ucUserInfoDataItemV.getPkId()) {
gtMinId = ucUserInfoDataItemV.getPkId();
}
UcUserInfoDataItem returnData = checkDataForContainer(updateDataContainerV, mainSyncId, tableSharding, mainInfoGuid, ctUserInfoDataContainerV, ucUserInfoDataItemV);
if (returnData != null) {
++dataCount;
System.out.println(dataCount + "." + ucUserInfoDataItemShow(mainSyncId, returnData));
}
if (addNum >= maxNum) {
break;
}
}
if (addNum >= maxNum) {
break;
}
}
String toSql = updateDataContainerToSql(dataCount, updateDataContainerV);
addSb.append(toSql);
addSb.append(rn);
FileUtil.appendUtf8String(addSb.toString(), filePath);
return dataCount;
}
public static void main(String[] args) {
try {
String ctPwd = null;
String ucPwd = null;
String syncIdText = null;
if (args != null && args.length >= 2) {
ctPwd = args[0];
ucPwd = args[1];
}
if (args.length >= 3) {
syncIdText = args[2];
}
if (ctPwd == null || ucPwd == null) {
System.out.println("请输入密码");
return;
}
String filePath = "/run/all.txt";
FileUtil.del(filePath);
System.out.println("删除文件:" + filePath);
DataSource dataSourceCt = createDataSourceForCt(ctPwd);
DataSource dataSourceUc = createDataSourceForUc(ucPwd);
String mainSyncIdAll = null;
String splitChar = "";
if (syncIdText == null || syncIdText.trim().isEmpty()) {
mainSyncIdAll = getMainSyncIdStrList();
splitChar = "" + '\n';
} else {
mainSyncIdAll = syncIdText;
splitChar = ",";
}
String[] mainSyncIdArray = mainSyncIdAll.split(splitChar);
int dataCount = 0;
for (String each : mainSyncIdArray) {
if (each == null || each.trim().isEmpty()) {
continue;
}
dataCount = runForMainSyncId(dataCount, filePath, each.trim(), dataSourceCt, dataSourceUc);
}
System.out.println("end:" + dataCount);
} catch (SQLException e) {
System.err.println("数据库操作错误: " + e.getMessage());
e.printStackTrace();
} catch (Exception e) {
System.err.println("发生错误: " + e.getMessage());
e.printStackTrace();
}
}
}
javac脚本:
/app/jdk-11.0.11/bin/javac -cp .:hutool-all-5.8.38.jar:mysql-connector-j-8.0.31.jar CmdToolDbCtToBatchSql.java
java脚本:
/app/jdk-11.0.11/bin/java -cp .:hutool-all-5.8.38.jar:mysql-connector-j-8.0.31.jar CmdToolDbCtToBatchSql onePwd twoPwd
执行效果如下: