using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading; using System.Diagnostics; using System.IO; using System.Transactions; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Data.SQLite; namespace RDH.PharmacyPlatform.Sync.Core { public class Bootstrap { #region Fields #region Rabbitmq fields private static ConnectionFactory _connectionFactory; //客户端到MQ服务器的连接 private static IConnection _connection; //与连接对象关联的通信通道 private static IModel _channel; private static IBasicProperties _channelProperties; //表示通信的消息队列 private static QueueDeclareOk _queue; private static EventingBasicConsumer _cusumer; #endregion private static Config _config; private static IMessageHandler _messageHandler; private static ILogger _logger; private static String _dbFileFullName; private static String _dbConnectionString; private static Thread _threadKeepMessageOut; private static Thread _threadKeepMessageIn; private static Boolean _isInited; private const String _ConfigFilePath = "sync.config"; private const String _DbFilePath = "sync.db"; #endregion #region Constructors static Bootstrap() { _dbFileFullName = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, _DbFilePath); _dbConnectionString = $"Data Source={_dbFileFullName};Version=3;"; } ~Bootstrap() { Exit(); } #endregion #region Properties public static Config Config { get => _config; internal set { _config = value; } } #region Internal internal static ConnectionFactory MqConnectionFacotry { get => _connectionFactory; } internal static IModel MqChannel { get => _channel; } internal static IBasicProperties ChannelProperties { get => _channelProperties; } internal static IMessageHandler MessageHandler { get => _messageHandler; } internal static String DbFileFullName { get => _dbFileFullName; } internal static String DbConnectionString { get => _dbConnectionString; } #endregion #endregion #region Methods #region Public /// /// 根据配置文件进行初始化 /// /// 错误信息 public static String Init() { if (!File.Exists(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, _ConfigFilePath))) { return "配置文件不存在"; } String configContent = File.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, _ConfigFilePath)); Config config = null; //try { config = JsonSerializer.DeserializeObject(configContent); } //catch (Exception ex) //{ // Log($"{ex.Message}\n{ex.StackTrace}"); // return "配置格式不正确:" + ex.Message; //} return Init(config); } /// /// 根据指定的配置进行初始化 /// /// 错误信息 public static String Init(Config config) { InitDatabase(); if (_config != null) { return null; } if (config == null) { return "参数不能为空"; } _config = config; #region 初始化连接基础参数 if (_connectionFactory == null) { _connectionFactory = new ConnectionFactory(); _connectionFactory.UserName = config.ServerUserName; _connectionFactory.Password = config.ServerUserPassword; _connectionFactory.HostName = config.ServerIpHost; _connectionFactory.Port = config.ServerSyncPort; //开启自动恢复连接 _connectionFactory.AutomaticRecoveryEnabled = true; } #endregion try { #region 创建连接对象并初始化参数、订阅等 _connection = Bootstrap.MqConnectionFacotry.CreateConnection(); _channel = _connection.CreateModel(); _channelProperties = _channel.CreateBasicProperties(); _channelProperties.DeliveryMode = 2; _channelProperties.ContentType = "text/plain"; _channel.BasicReturn += _channel_BasicReturn; _channel.BasicAcks += _channel_BasicAcks; _channel.BasicNacks += _channel_BasicNacks; _queue = _channel.QueueDeclare(Bootstrap.Config.LocalNodeId, true, true, false); _cusumer = new EventingBasicConsumer(_channel); //autoAck // true:表示消息服务器收到消息后,自动生成消息回复 // false:表示消费端收到信息后,需要主动进行回复 _channel.BasicConsume(_queue.QueueName, false, _cusumer); _cusumer.Received += _cusumer_Received; #endregion _isInited = true; } catch (Exception ex) { _config = null; Log($"{ex.Message}\n{ex.StackTrace}"); return ex.Message; } _threadKeepMessageIn = new Thread(ThreadKeepMessageInHandler); _threadKeepMessageIn.IsBackground = true; _threadKeepMessageIn.Start(); _threadKeepMessageOut = new Thread(ThreadKeepMessageOutHandler); _threadKeepMessageOut.IsBackground = true; _threadKeepMessageOut.Start(); return null; } public static void Exit() { _isInited = false; CloseConnection(); } /// /// 注册消息处理器 /// public static void RegisteMessageHandler(IMessageHandler handler) { _messageHandler = handler; } /// /// 注册日志处理器 /// public static void RegisteLogger(ILogger logger) { _logger = logger; } #endregion #region Internal //初始化本地数据库 internal static void InitDatabase() { if (!File.Exists(_dbFileFullName)) { Boolean createFlag = true; Log("创建消息数据库"); try { #region 创建数据库 SQLiteConnection.CreateFile(_dbFileFullName); Log("创建消息表"); using (SQLiteConnection conn = new SQLiteConnection(_dbConnectionString)) { conn.Open(); SQLiteCommand cmd = conn.CreateCommand(); cmd.CommandType = System.Data.CommandType.Text; cmd.CommandText = "CREATE TABLE T_Message_Data_In (" + " Create_Time DATETIME NOT NULL," + " Source_Id VARCHAR(50) NOT NULL," + " Message_Content VARCHAR(5000)," + " Message_Content_Type VARCHAR(50)," + " Filter_Type INTEGER(1) NOT NULL," + " Target_Id VARCHAR(50) NOT NULL)"; cmd.ExecuteNonQuery(); cmd.CommandText = "CREATE TABLE T_Message_Data_Out (" + " Create_Time DATETIME NOT NULL," + " Source_Id VARCHAR(50) NOT NULL," + " Message_Content VARCHAR(5000)," + " Message_Content_Type VARCHAR(50)," + " Filter_Type INTEGER(1) NOT NULL," + " Target_Id VARCHAR(50) NOT NULL)"; cmd.ExecuteNonQuery(); cmd.CommandText = "CREATE TABLE T_Nodes (" + " Node_Id VARCHAR(50) PRIMARY KEY," + " Group_Name VARCHAR(50)," + " Create_Time DATETIME)"; cmd.ExecuteNonQuery(); } #endregion } catch (Exception ex) { createFlag = false; Log($"{ex.Message}\n{ex.StackTrace}"); } if (!createFlag) { try { if (File.Exists(_dbFileFullName)) { File.Delete(_dbFileFullName); } } catch { } } } } internal static void Log(String content) { if (_logger != null) { _logger.Log(content); } else { Debug.WriteLine(String.Format("{0} TID:{1} Log:{2}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"), Thread.CurrentThread.ManagedThreadId.ToString(), content)); } } #endregion #region Private private async static void _cusumer_Received(object sender, BasicDeliverEventArgs e) { Log("_cusumer_Received"); if (Bootstrap.MessageHandler != null) { try { String content = Encoding.UTF8.GetString(e.Body.ToArray()); Log($"Content:{content}"); MessageData data = JsonSerializer.DeserializeObject(content); //接收到消息后即时将消息持久化到本地 using (ConnectionSessionScope conn = new ConnectionSessionScope(_dbConnectionString)) { MessageDataInDAL messageDataInDAL = new MessageDataInDAL(); messageDataInDAL.InsertModel(data); } _channel.BasicAck(e.DeliveryTag, false); } catch (Exception ex) { Bootstrap.Log($"{ex.Message}\n{ex.StackTrace}"); } } else { Log("未注册MessageHandler"); } } private static void _channel_BasicAcks(object sender, BasicAckEventArgs e) { Log("_channel_BasicAcks"); } private static void _channel_BasicNacks(object sender, BasicNackEventArgs e) { Log("_channel_BasicNacks"); } //处理消息发送后,未找到目标的处理 private static void _channel_BasicReturn(object sender, BasicReturnEventArgs e) { Log("_channel_BasicReturn"); MessageData data = JsonSerializer.DeserializeObject(Encoding.UTF8.GetString(e.Body.ToArray())); data.CreateTime = DateTime.Now; MessageClient.SendMessage(data); } //本地消息发送的线程处理 private static void ThreadKeepMessageOutHandler() { MessageDataOutDAL messageDataOutDAL = new MessageDataOutDAL(); while (_isInited) { MessageData messageOut = null; //Debug.WriteLine($"打开连接, id:{Thread.CurrentThread.ManagedThreadId}"); using (ConnectionSessionScope conn = new ConnectionSessionScope(_dbConnectionString)) { try { do { messageOut = messageDataOutDAL.Peek(); if (messageOut != null) { Debug.WriteLine($"读取到待发送的消息, id:{Thread.CurrentThread.ManagedThreadId}"); String sendState = MessageClient.InternalSendMessage(messageOut); if (sendState == null) { Debug.WriteLine($"发送成功,删除消息, id:{Thread.CurrentThread.ManagedThreadId}"); //Boolean state = Bootstrap.MqChannel.WaitForConfirms(); messageDataOutDAL.DeleteModel(messageOut); } else { Log("发送消息失败:" + sendState); } Thread.Sleep(1); } else { //Debug.WriteLine($"没有待发送的消息, id:{Thread.CurrentThread.ManagedThreadId}"); break; } } while (true); } catch (Exception ex) { Log($"{ex.Message}\n{ex.StackTrace}"); } } Thread.Sleep(1000); } } //本地消息响应的线程处理 private static void ThreadKeepMessageInHandler() { Debug.WriteLine($"ThreadKeepMessageInHandler start, id:{Thread.CurrentThread.ManagedThreadId}"); MessageDataInDAL messageDataInDAL = new MessageDataInDAL(); while (_isInited) { MessageData messageIn = null; try { using (ConnectionSessionScope conn = new ConnectionSessionScope(_dbConnectionString)) { do { messageIn = messageDataInDAL.Peek(); if (messageIn != null) { Debug.WriteLine($"读取到待处理的消息, id:{Thread.CurrentThread.ManagedThreadId}"); String handleState = _messageHandler.ReceivedMessage(messageIn).Result; Debug.WriteLine($"处理消息 end, id:{Thread.CurrentThread.ManagedThreadId}"); if (handleState == null) { Debug.WriteLine($"处理成功,删除消息, id:{Thread.CurrentThread.ManagedThreadId}"); messageDataInDAL.DeleteModel(messageIn); } else { MessageData copy = new MessageData() { CreateTime = DateTime.Now, MessageContent = messageIn.MessageContent, MessageContentType = messageIn.MessageContentType, FilterType = messageIn.FilterType, SourceId = messageIn.SourceId, TargetId = messageIn.TargetId, }; conn.BeginTransaction(); messageDataInDAL.DeleteModel(messageIn); messageDataInDAL.InsertModel(copy); conn.CommitTransaction(); } Thread.Sleep(1); } else { //Debug.WriteLine($"没有待处理的消息, id:{Thread.CurrentThread.ManagedThreadId}"); break; } } while (true); } } catch (Exception ex) { Log($"{ex.Message}\n{ex.StackTrace}"); } Thread.Sleep(1000); } } private static void CloseConnection() { if (_channel != null) { _channel.Close(); _channel = null; } if (_connection != null) { _connection.Close(); _connection = null; } } #endregion #endregion } }