仿聚宽模拟交易系统之二

博主: Simon Lin 创建于: Jun 15, 2020 更新于: Jun 15, 2020
分类: stock
标签: finance stock

仿聚宽模拟交易系统之二–时间轴模拟

我现在真的是有点懒,懒的分享。拖啊拖的,把要分享的东西就拖没了。只能在还能想到的时候抽自己一鞭子。

模拟交易系统运行的一个核心,模拟时间的运行,在各个时刻,可以随意的添加事件进行处理。前文已经说过,时间模拟采用一个简单的循环就可以了,再采用一个观察者模式,把时间的运行作为事件来输出,动态添加的事件作为观察者,观察到事件了,就进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from datetime import datetime,date,time,timedelta 

class Observer:
def update(self, temp):
return

def display(self):
'''业务逻辑'''
return

class Subject:
def registerObserver(self, observer):
return
def removeObserver(self, observer):
return
def notifyObservers(self):
return

class Clock(Subject):
def __init__(self,start_date=None,end_date=None):
self.observers = []

if start_date is None:
self.start_date=datetime.strptime('2019-01-01', '%Y-%m-%d')
else:
self.start_date = datetime.strptime(start_date, '%Y-%m-%d')

if end_date is None:
self.end_date = datetime.now()
else:
self.end_date = end_date
self.current_datetime = self.start_date
# self.current_date= datetime.date(self.start_date)
# self.current_time = datetime.time(self.start_date)
return
def registerObserver(self, observer):
self.observers.append(observer)
return
def removeObserver(self, observer):
self.observers.remove(observer)
return
def notifyObservers(self):
for item in self.observers:
item.update(self.current_datetime)
return

def schedule(self):
while self.current_datetime<self.end_date:
self.addMinute()
print('schedule done')

def getCurrentDate(self):
return datetime.date(self.current_datetime)
def getCurrentTime(self):
return datetime.time(self.current_datetime)
def measurementsChanged(self):
self.notifyObservers()
return
def addMinute(self):
self.current_datetime = self.current_datetime+timedelta(minutes=1)
if self.current_datetime.isoweekday()>5:
self.current_datetime = self.current_datetime+timedelta(days=2)
if self.current_datetime.hour>=16:
self.current_datetime = self.current_datetime+timedelta(hours=16)

#print('add minute'+str(self.current_datetime))
self.measurementsChanged()
return
def addDate(self):
self.current_datetime = self.current_datetime +datetime.timedelta(days=1)
print('add date'+str(self.current_datetime))
self.measurementsChanged()
return


class run_daily_observer(Observer):
def __init__(self, clock, plan_time=time(hour=9,minute=0)):
self.clock = clock
self.current_datetime = datetime(year=2000,month=1,day=1)
self.current_time = time(hour=0,minute=0)
self.plan_time = plan_time
clock.registerObserver(self)
return

def remove():
clock.removeObserver(self)

def update(self, current_datetime):
self.current_datetime = current_datetime
self.current_time = current_datetime.time()
if self.plan_time==self.current_time:
self.display()
return
def display(self):
print('current_datetime = %s, current_time = %s.' %(str(self.current_datetime), str(self.current_time)))
return

class run_weekly_observer(Observer):
def __init__(self, clock,plan_week_day=1,plan_time=time(hour=9,minute=0)):
self.clock = clock
self.current_datetime = datetime(year=2000,month=1,day=1)
self.plan_week_day = plan_week_day
self.plan_time = plan_time
clock.registerObserver(self)
return
def update(self, current_datetime):
self.current_datetime = current_datetime
if self.plan_week_day==current_datetime.isoweekday() and self.plan_time==current_datetime.time():
self.display()
return
def display(self):
print('run_weekly_observer current_datetime: %s, plan_week_day = %s.' % (str(self.current_datetime), str(self.plan_week_day)))
return

class run_monthly_observer(Observer):
'''
ps:存在非交易日被跳过现象需处理
'''
def __init__(self, clock, plan_day=1,plan_time=time(hour=9,minute=0)):
self.clock = clock
self.current_datetime = datetime(year=2000,month=1,day=1)
self.plan_day = plan_day
self.plan_time = plan_time
clock.registerObserver(self)
return
def update(self, current_datetime):
self.current_datetime = current_datetime
if self.plan_day==current_datetime.day and self.plan_time==current_datetime.time():
self.display()
return
def display(self):
print('run_monthly_observer current_datetime: %s, plan_day = %s.' % (str(self.current_datetime), str(self.plan_day)))
return


clock = Clock()
#plan_time = time(hour=8,minute=30)
#func1 = run_daily_observer(clock)
#func2 = run_weekly_observer(clock,3)
func3 = run_monthly_observer(clock,3,time(hour=13,minute=0))
clock.schedule()

打赏 支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者