This commit is contained in:
Lee
2024-01-03 07:25:17 +00:00
parent 4ed5d2af84
commit e603f65fdc
18 changed files with 1350 additions and 639 deletions

View File

@ -1,158 +0,0 @@
import SQLiteDatabase from "better-sqlite3";
import cron from "node-cron";
import Server from "../server/server";
import { logger } from "../utils/logger";
import { getFormattedDate } from "../utils/timeUtils";
import Config from "../../data/config.json";
import { Ping } from "../types/ping";
import { createDirectory } from "../utils/fsUtils";
const DATA_DIR = "data";
const BACKUP_DIR = `${DATA_DIR}/database-backups`;
const PINGS_TABLE = "pings";
const RECORD_TABLE = "record";
/**
* SQL Queries
*/
const CREATE_PINGS_TABLE = `
CREATE TABLE IF NOT EXISTS pings (
id INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
ip TINYTEXT NOT NULL,
playerCount MEDIUMINT NOT NULL
);
`;
const CREATE_RECORD_TABLE = `
CREATE TABLE IF NOT EXISTS record (
id INTEGER PRIMARY KEY,
timestamp BIGINT NOT NULL,
ip TINYTEXT NOT NULL,
playerCount MEDIUMINT NOT NULL
);
`;
const CREATE_PINGS_INDEX = `CREATE INDEX IF NOT EXISTS ip_index ON pings (id, ip, playerCount)`;
const CREATE_TIMESTAMP_INDEX = `CREATE INDEX IF NOT EXISTS timestamp_index on PINGS (id, timestamp)`;
const INSERT_PING = `
INSERT INTO ${PINGS_TABLE} (id, timestamp, ip, playerCount)
VALUES (?, ?, ?, ?)
`;
const INSERT_RECORD = `
INSERT INTO ${RECORD_TABLE} (id, timestamp, ip, playerCount)
VALUES (?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
timestamp = excluded.timestamp,
playerCount = excluded.playerCount,
ip = excluded.ip
`;
const SELECT_PINGS = `
SELECT * FROM ${PINGS_TABLE} WHERE id = ? AND timestamp >= ? AND timestamp <= ?
`;
const SELECT_RECORD = `
SELECT playerCount, timestamp FROM ${RECORD_TABLE} WHERE {} = ?
`;
const SELECT_RECORD_BY_ID = SELECT_RECORD.replace("{}", "id");
const SELECT_RECORD_BY_IP = SELECT_RECORD.replace("{}", "ip");
export default class Database {
private db: SQLiteDatabase.Database;
constructor() {
this.db = new SQLiteDatabase(`${DATA_DIR}/db.sqlite`);
this.db.pragma("journal_mode = WAL"); // Enable WAL mode for better performance
logger.info("Ensuring tables exist");
this.db.exec(CREATE_PINGS_TABLE); // Ensure the pings table exists
this.db.exec(CREATE_RECORD_TABLE); // Ensure the record table exists
logger.info("Ensuring indexes exist");
this.db.exec(CREATE_PINGS_INDEX); // Ensure the pings index exists
this.db.exec(CREATE_TIMESTAMP_INDEX); // Ensure the timestamp index exists
cron.schedule(Config.backup.cron, () => {
this.createBackup();
});
}
/**
* Gets the pings for a server.
*
* @param id the server ID
* @param startTime the start time
* @param endTime the end time
* @returns the pings for the server
*/
public getPings(id: number, startTime: number, endTime: number): Ping[] | [] {
return this.db.prepare(SELECT_PINGS).all(id, startTime, endTime) as
| Ping[]
| [];
}
/**
* Gets the record player count for a server.
*
* @param value the server ID or IP
* @returns the record for the server
*/
public getRecord(value: any): Ping | undefined {
if (typeof value === "number") {
return this.db.prepare(SELECT_RECORD_BY_ID).get(value) as
| Ping
| undefined;
}
return this.db.prepare(SELECT_RECORD_BY_IP).get(value) as Ping | undefined;
}
/**
* Creates a full backup of the database.
*/
public async createBackup() {
logger.info("Creating database backup");
createDirectory(BACKUP_DIR);
await this.db.backup(`${BACKUP_DIR}/${getFormattedDate()}.sqlite`);
logger.info("Finished creating database backup");
}
/**
* Inserts a ping into the database.
*
* @param timestamp the timestamp of the ping
* @param ip the IP address of the server
* @param playerCount the number of players online
*/
public insertPing(server: Server, response: Ping) {
const { timestamp, playerCount } = response;
const id = server.getID();
const ip = server.getIP();
const statement = this.db.prepare(INSERT_PING);
statement.run(id, timestamp, ip, playerCount); // Insert the ping into the database
}
/**
* Inserts a record into the database.
*
* @param server the server to insert
* @param response the response to insert
* @returns true if the a new record was set, false otherwise
*/
public insertRecord(server: Server, response: Ping): boolean {
const { timestamp, playerCount } = response;
const id = server.getID();
const ip = server.getIP();
const oldRecord = this.getRecord(id);
if (oldRecord && oldRecord.playerCount >= playerCount) {
return false; // Don't update the record if the player count is lower
}
const statement = this.db.prepare(INSERT_RECORD);
statement.run(id, timestamp, ip, playerCount); // Insert the record into the database
return true;
}
}

View File

@ -1,14 +1,6 @@
import Database from "./database/database";
import Influx from "./influx/influx";
import Scanner from "./scanner/scanner";
import ServerManager from "./server/serverManager";
import WebsocketServer from "./websocket/websocket";
import Config from "../data/config.json";
/**
* The database instance.
*/
export const database = new Database();
/**
* The server manager instance.
@ -16,9 +8,9 @@ export const database = new Database();
export const serverManager = new ServerManager();
/**
* The websocket server instance.
* The influx database instance.
*/
export const websocketServer = new WebsocketServer(Config.websocket.port);
export const influx = new Influx();
(async () => {
await serverManager.init();
@ -26,18 +18,3 @@ export const websocketServer = new WebsocketServer(Config.websocket.port);
// The scanner is responsible for scanning all servers
new Scanner();
})();
// The websocket server is responsible for
// sending data to the client in real time
// serverManager.getServers().forEach((server) => {
// const record = database.getRecord(server.getID());
// if (!record) {
// return;
// }
// console.log(
// `Record for "${server.getName()}": ${record.playerCount} (${formatTimestamp(
// record.timestamp
// )})`
// );
// });

34
src/influx/influx.ts Normal file
View File

@ -0,0 +1,34 @@
import { InfluxDB, Point, WriteApi } from "@influxdata/influxdb-client";
import Config from "../../data/config.json";
import { logger } from "../utils/logger";
export default class Influx {
private influx: InfluxDB;
private writeApi: WriteApi;
constructor() {
logger.info("Loading influx database");
this.influx = new InfluxDB({
url: Config.influx.url,
token: Config.influx.token,
});
logger.info("InfluxDB initialized");
this.writeApi = this.influx.getWriteApi(
Config.influx.org,
Config.influx.bucket,
"ms"
);
}
/**
* Write a point to the database.
*
* @param point the point to write
*/
public async writePoint(point: Point): Promise<void> {
this.writeApi.writePoint(point);
}
}

View File

@ -1,7 +1,6 @@
import cron from "node-cron";
import { database, serverManager, websocketServer } from "..";
import Server, { ServerStatus } from "../server/server";
import { serverManager } from "..";
import Config from "../../data/config.json";
import { logger } from "../utils/logger";
@ -22,43 +21,9 @@ export default class Scanner {
// ping all servers in parallel
await Promise.all(
serverManager.getServers().map((server) => this.scanServer(server))
serverManager.getServers().map((server) => server.pingServer())
);
logger.info("Finished scanning servers");
}
/**
* Scans a server and inserts the ping into the database.
*
* @param server the server to scan
* @returns a promise that resolves when the server has been scanned
*/
async scanServer(server: Server): Promise<void> {
//logger.info(`Scanning server ${server.getIP()} - ${server.getType()}`);
let response;
let online = false;
try {
response = await server.pingServer();
if (response == undefined) {
return; // Server is offline
}
online = true;
} catch (err) {
logger.info(`Failed to ping ${server.getIP()}`, err);
websocketServer.sendServerError(server, ServerStatus.OFFLINE);
return;
}
if (!online || !response) {
return; // Server is offline
}
database.insertPing(server, response);
const isNewRecord = database.insertRecord(server, response);
// todo: send all server pings at once
websocketServer.sendNewPing(server, response, isNewRecord);
}
}

View File

@ -2,8 +2,11 @@ import javaPing from "mcping-js";
import { ResolvedServer, resolveDns } from "../utils/dnsResolver";
const bedrockPing = require("mcpe-ping-fixed"); // Doesn't have typescript definitions
import { Point } from "@influxdata/influxdb-client";
import { influx } from "..";
import Config from "../../data/config.json";
import { Ping } from "../types/ping";
import { logger } from "../utils/logger";
/**
* The type of server.
@ -79,23 +82,43 @@ export default class Server {
/**
* Pings a server and gets the response.
*
* @param server the server to ping
* @param insertPing whether to insert the ping into the database
* @returns the ping response or undefined if the server is offline
*/
public pingServer(): Promise<Ping | undefined> {
switch (this.getType()) {
case "PC": {
return this.pingPCServer();
public async pingServer(): Promise<Ping | undefined> {
try {
let response;
switch (this.getType()) {
case "PC": {
response = await this.pingPCServer();
break;
}
case "PE": {
response = await this.pingPEServer();
break;
}
}
case "PE": {
return this.pingPEServer();
if (!response) {
return Promise.resolve(undefined);
}
default: {
throw new Error(
`Unknown server type ${this.getType()} for ${this.getName()}`
try {
influx.writePoint(
new Point("playerCount")
.tag("id", this.getID().toString())
.tag("ip", this.getIP().toLowerCase())
.intField("playerCount", response.playerCount)
.timestamp(response.timestamp)
);
} catch (err) {
logger.warn(`Failed to write ping to influxdb`, err);
}
return Promise.resolve(response);
} catch (err) {
logger.warn(`Failed to ping ${this.getIP()}`, err);
return Promise.resolve(undefined);
}
}
@ -133,7 +156,7 @@ export default class Server {
const serverPing = new javaPing.MinecraftServer(ip, port);
return new Promise((resolve, reject) => {
serverPing.ping(Config.scanner.timeout, 700, (err, res) => {
serverPing.ping(Config.scanner.timeout, 765, (err, res) => {
if (err || res == undefined) {
return reject(err);
}

View File

@ -1,6 +1,7 @@
import Server, { ServerType } from "./server";
import Servers from "../../data/servers.json";
import { logger } from "../utils/logger";
export default class ServerManager {
private servers: Server[] = [];
@ -11,6 +12,7 @@ export default class ServerManager {
* Loads the servers from the config file.
*/
async init() {
logger.info("Loading servers");
for (const configServer of Servers) {
const server = new Server({
id: configServer.id,
@ -18,11 +20,17 @@ export default class ServerManager {
name: configServer.name,
type: configServer.type as ServerType,
});
try {
await server.pingServer();
} catch (err) {}
this.servers.push(server);
}
// do an inital ping of all servers to load data from them
await Promise.all(
this.servers.map((server) => {
try {
server.pingServer();
} catch (err) {}
})
);
logger.info(`Loaded ${this.servers.length} servers`);
}
/**

View File

@ -1,82 +0,0 @@
import { Socket, Server as SocketServer } from "socket.io";
import { serverManager } from "..";
import Server, { ServerStatus } from "../server/server";
import { Ping } from "../types/ping";
import { logger } from "../utils/logger";
export default class WebsocketServer {
private server: SocketServer;
constructor(port: number) {
logger.info(`Starting websocket server on port ${port}`);
this.server = new SocketServer(port);
this.server.on("connection", (socket) => {
logger.debug("ws: Client connected");
this.sendServerList(socket);
});
}
/**
* Sends the server list to the given socket.
*
* @param socket the socket to send the server list to
*/
public sendServerList(socket: Socket): void {
logger.debug(`ws: Sending server list to ${socket.id}`);
const servers = [];
for (const server of serverManager.getServers()) {
servers.push({
id: server.getID(),
name: server.getName(),
ip: server.getIP(),
port: server.getPort(),
favicon: server.getFavicon(),
});
}
socket.emit("serverList", servers);
}
/**
* Sends the latest ping data for the given server to all clients.
*
* @param server the server to send the ping for
* @param pingResponse the ping data to send
* @param isNewRecord whether a new record has been set
*/
public sendNewPing(
server: Server,
pingResponse: Ping,
isNewRecord: boolean
): void {
logger.debug(`ws: Sending new ping for ${server.getName()}`);
this.server.emit("newPing", {
server: server.getID(),
timestamp: pingResponse.timestamp,
playerCount: pingResponse.playerCount,
});
if (isNewRecord) {
logger.debug(`ws: Sending new record for ${server.getName()}`);
this.server.emit("newRecord", {
server: server.getID(),
timestamp: pingResponse.timestamp,
playerCount: pingResponse.playerCount,
});
}
}
/**
* Sends the server status for the given server to all clients.
*
* @param server the server to send the status for
* @param status the status to send
*/
public sendServerError(server: Server, status: ServerStatus): void {
logger.debug(`ws: Sending server status for ${server.getName()}`);
this.server.emit("serverStatus", {
server: server.getID(),
status: status,
});
}
}