Fix self-reply loop issue when using same email for send/receive (#4)

- Add Message-ID tracking to prevent processing system-sent emails
- Track sent emails in sent-messages.json with auto-cleanup
- Skip system emails in both email-listener.js and relay-pty.js
- Extract session from token/headers/body for proper reply routing
- Reduce verbose logging in tmux-injector to debug level

Fixes #3

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Song-Ze Yu 2025-08-01 12:28:06 +08:00 committed by GitHub
parent dc9d4f5a90
commit db20186423
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 176 additions and 15 deletions

View File

@ -17,6 +17,7 @@ class EmailChannel extends NotificationChannel {
this.transporter = null;
this.sessionsDir = path.join(__dirname, '../../data/sessions');
this.templatesDir = path.join(__dirname, '../../assets/email-templates');
this.sentMessagesPath = config.sentMessagesPath || path.join(__dirname, '../../data/sent-messages.json');
this.tmuxMonitor = new TmuxMonitor();
this._ensureDirectories();
@ -114,12 +115,16 @@ class EmailChannel extends NotificationChannel {
// Generate email content
const emailContent = this._generateEmailContent(notification, sessionId, token);
// Generate unique Message-ID
const messageId = `<${sessionId}-${Date.now()}@claude-code-remote>`;
const mailOptions = {
from: this.config.from || this.config.smtp.auth.user,
to: this.config.to,
subject: emailContent.subject,
html: emailContent.html,
text: emailContent.text,
messageId: messageId,
// Add custom headers for reply recognition
headers: {
'X-Claude-Code-Remote-Session-ID': sessionId,
@ -130,6 +135,10 @@ class EmailChannel extends NotificationChannel {
try {
const result = await this.transporter.sendMail(mailOptions);
this.logger.info(`Email sent successfully to ${this.config.to}, Session: ${sessionId}`);
// Track sent message
await this._trackSentMessage(messageId, sessionId, token);
return true;
} catch (error) {
this.logger.error('Failed to send email:', error.message);
@ -205,6 +214,38 @@ class EmailChannel extends NotificationChannel {
}
}
async _trackSentMessage(messageId, sessionId, token) {
let sentMessages = { messages: [] };
// Read existing data if file exists
if (fs.existsSync(this.sentMessagesPath)) {
try {
sentMessages = JSON.parse(fs.readFileSync(this.sentMessagesPath, 'utf8'));
} catch (e) {
this.logger.warn('Failed to read sent-messages.json, creating new one');
}
}
// Add new message
sentMessages.messages.push({
messageId: messageId,
sessionId: sessionId,
token: token,
type: 'notification',
sentAt: new Date().toISOString()
});
// Ensure directory exists
const dir = path.dirname(this.sentMessagesPath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
// Write updated data
fs.writeFileSync(this.sentMessagesPath, JSON.stringify(sentMessages, null, 2));
this.logger.debug(`Tracked sent message: ${messageId}`);
}
_generateEmailContent(notification, sessionId, token) {
const template = this._getTemplate(notification.type);
const timestamp = new Date().toLocaleString('zh-CN');

View File

@ -19,6 +19,7 @@ class EmailListener extends EventEmitter {
this.isConnected = false;
this.isListening = false;
this.sessionsDir = path.join(__dirname, '../data/sessions');
this.sentMessagesPath = config.sentMessagesPath || path.join(__dirname, '../data/sent-messages.json');
this.checkInterval = (config.template?.checkInterval || 30) * 1000; // Convert to milliseconds
this.lastCheckTime = new Date();
@ -213,6 +214,14 @@ class EmailListener extends EventEmitter {
async _handleParsedEmail(email, seqno) {
try {
// First check if this is a system-sent email
const messageId = email.headers.get('message-id');
if (await this._isSystemSentEmail(messageId)) {
this.logger.debug(`Skipping system-sent email: ${messageId}`);
await this._removeFromSentMessages(messageId);
return;
}
// Check if it's a reply email
if (!this._isReplyEmail(email)) {
this.logger.debug(`Email ${seqno} is not a TaskPing reply`);
@ -270,9 +279,9 @@ class EmailListener extends EventEmitter {
}
_isReplyEmail(email) {
// Check if subject contains TaskPing identifier
// Check if subject contains Claude-Code-Remote identifier
const subject = email.subject || '';
if (!subject.includes('[TaskPing]')) {
if (!subject.includes('[Claude-Code-Remote')) {
return false;
}
@ -290,8 +299,17 @@ class EmailListener extends EventEmitter {
_extractSessionId(email) {
// Extract from email headers
const headers = email.headers;
if (headers && headers.get('x-taskping-session-id')) {
return headers.get('x-taskping-session-id');
if (headers && headers.get('x-claude-code-remote-session-id')) {
return headers.get('x-claude-code-remote-session-id');
}
// Extract token from subject line
const subject = email.subject || '';
const tokenMatch = subject.match(/\[Claude-Code-Remote #([A-Z0-9]{6,8})\]/);
if (tokenMatch) {
const token = tokenMatch[1];
// Look up session by token
return this._getSessionIdByToken(token);
}
// Extract from email body (as backup method)
@ -311,6 +329,25 @@ class EmailListener extends EventEmitter {
return null;
}
_getSessionIdByToken(token) {
// Check session files for matching token
try {
const sessionFiles = fs.readdirSync(this.sessionsDir);
for (const file of sessionFiles) {
if (file.endsWith('.json')) {
const sessionPath = path.join(this.sessionsDir, file);
const sessionData = JSON.parse(fs.readFileSync(sessionPath, 'utf8'));
if (sessionData.token === token) {
return sessionData.id;
}
}
}
} catch (error) {
this.logger.error('Error looking up session by token:', error.message);
}
return null;
}
async _validateSession(sessionId) {
const sessionFile = path.join(this.sessionsDir, `${sessionId}.json`);
@ -431,6 +468,42 @@ class EmailListener extends EventEmitter {
}
}
}
async _isSystemSentEmail(messageId) {
if (!messageId || !fs.existsSync(this.sentMessagesPath)) {
return false;
}
try {
const sentMessages = JSON.parse(fs.readFileSync(this.sentMessagesPath, 'utf8'));
return sentMessages.messages.some(msg => msg.messageId === messageId);
} catch (error) {
this.logger.error('Error reading sent messages:', error.message);
return false;
}
}
async _removeFromSentMessages(messageId) {
if (!fs.existsSync(this.sentMessagesPath)) {
return;
}
try {
const sentMessages = JSON.parse(fs.readFileSync(this.sentMessagesPath, 'utf8'));
sentMessages.messages = sentMessages.messages.filter(msg => msg.messageId !== messageId);
// Also clean up old messages (older than 24 hours)
const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
sentMessages.messages = sentMessages.messages.filter(msg => {
return new Date(msg.sentAt) > oneDayAgo;
});
fs.writeFileSync(this.sentMessagesPath, JSON.stringify(sentMessages, null, 2));
this.logger.debug(`Removed message ${messageId} from sent tracking`);
} catch (error) {
this.logger.error('Error removing from sent messages:', error.message);
}
}
}
module.exports = EmailListener;

View File

@ -28,6 +28,7 @@ const log = pino({
// Global configuration
const SESS_PATH = process.env.SESSION_MAP_PATH || path.join(__dirname, '../data/session-map.json');
const PROCESSED_PATH = path.join(__dirname, '../data/processed-messages.json');
const SENT_MESSAGES_PATH = path.join(__dirname, '../data/sent-messages.json');
const ALLOWED_SENDERS = (process.env.ALLOWED_SENDERS || '').split(',').map(s => s.trim().toLowerCase()).filter(Boolean);
const PTY_POOL = new Map();
let PROCESSED_MESSAGES = new Set();
@ -391,9 +392,17 @@ async function fallbackToClipboard(command) {
async function handleMailMessage(parsed) {
try {
log.debug({ uid: parsed.uid, messageId: parsed.messageId }, 'handleMailMessage called');
// Check if this is a system-sent email
const messageId = parsed.messageId;
if (await isSystemSentEmail(messageId)) {
log.info({ messageId }, 'Skipping system-sent email');
await removeFromSentMessages(messageId);
return;
}
// Simplified duplicate detection (UID already checked earlier)
const uid = parsed.uid;
const messageId = parsed.messageId;
// Only perform additional checks for emails without UID
if (!uid) {
@ -691,6 +700,44 @@ function fetchAndProcessEmails(imap, uids) {
});
}
// Check if email is system-sent
async function isSystemSentEmail(messageId) {
if (!messageId || !existsSync(SENT_MESSAGES_PATH)) {
return false;
}
try {
const sentMessages = JSON.parse(readFileSync(SENT_MESSAGES_PATH, 'utf8'));
return sentMessages.messages.some(msg => msg.messageId === messageId);
} catch (error) {
log.error({ error }, 'Error reading sent messages');
return false;
}
}
// Remove email from sent messages tracking
async function removeFromSentMessages(messageId) {
if (!existsSync(SENT_MESSAGES_PATH)) {
return;
}
try {
const sentMessages = JSON.parse(readFileSync(SENT_MESSAGES_PATH, 'utf8'));
sentMessages.messages = sentMessages.messages.filter(msg => msg.messageId !== messageId);
// Also clean up old messages (older than 24 hours)
const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
sentMessages.messages = sentMessages.messages.filter(msg => {
return new Date(msg.sentAt) > oneDayAgo;
});
writeFileSync(SENT_MESSAGES_PATH, JSON.stringify(sentMessages, null, 2));
log.debug({ messageId }, 'Removed message from sent tracking');
} catch (error) {
log.error({ error }, 'Error removing from sent messages');
}
}
// Start service
if (require.main === module) {
startImap();

View File

@ -92,10 +92,10 @@ class TmuxInjector {
// 3. Send enter
const enterCommand = `tmux send-keys -t ${this.sessionName} C-m`;
this.log.info(`Injecting command via tmux: ${command}`);
this.log.info(`Step 1 - Clear: ${clearCommand}`);
this.log.info(`Step 2 - Send: ${sendCommand}`);
this.log.info(`Step 3 - Enter: ${enterCommand}`);
this.log.debug(`Injecting command via tmux: ${command}`);
// this.log.debug(`Step 1 - Clear: ${clearCommand}`);
// this.log.debug(`Step 2 - Send: ${sendCommand}`);
// this.log.debug(`Step 3 - Enter: ${enterCommand}`);
// Execute three steps
exec(clearCommand, (clearError) => {
@ -123,7 +123,7 @@ class TmuxInjector {
return;
}
this.log.info('Command sent successfully in 3 steps');
this.log.debug('Command sent successfully in 3 steps');
// Brief wait for command sending
await new Promise(r => setTimeout(r, 1000));
@ -131,7 +131,7 @@ class TmuxInjector {
// Check if command is already displayed in Claude
const capture = await this.getCaptureOutput();
if (capture.success) {
this.log.info(`Claude state after injection: ${capture.output.slice(-200).replace(/\n/g, ' ')}`);
this.log.debug(`Claude state after injection: ${capture.output.slice(-200).replace(/\n/g, ' ')}`);
}
// Wait and check if confirmation is needed
@ -172,7 +172,7 @@ class TmuxInjector {
}
const output = capture.output;
this.log.info(`Confirmation check ${attempts}: ${output.slice(-200).replace(/\n/g, ' ')}`);
this.log.debug(`Confirmation check ${attempts}: ${output.slice(-200).replace(/\n/g, ' ')}`);
// Check for multi-option confirmation dialog (priority handling)
if (output.includes('Do you want to proceed?') &&
@ -297,7 +297,7 @@ class TmuxInjector {
!output.includes('Do you want to proceed?') &&
!output.includes('1. Yes') &&
!output.includes('(y/n)')) {
this.log.info('New input prompt detected, command likely completed');
this.log.debug('New input prompt detected, command likely completed');
break;
}
@ -319,7 +319,7 @@ class TmuxInjector {
// Final state check
const finalCapture = await this.getCaptureOutput();
if (finalCapture.success) {
this.log.info(`Final state: ${finalCapture.output.slice(-100).replace(/\n/g, ' ')}`);
this.log.debug(`Final state: ${finalCapture.output.slice(-100).replace(/\n/g, ' ')}`);
}
}
@ -358,7 +358,7 @@ class TmuxInjector {
// Complete command injection workflow
async injectCommandFull(token, command) {
try {
this.log.info(`🎯 Starting tmux command injection (Token: ${token})`);
this.log.debug(`Starting tmux command injection (Token: ${token})`);
// 1. Check if tmux is available
const tmuxAvailable = await this.checkTmuxAvailable();