我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用django.shortcuts.HttpResponse()。
def download_log(request): if request.method == 'GET': log_path = request.GET.get('log_path') log_name = request.GET.get('log_name') print('log_path:',log_path,'log_name:',log_name) #??zip????? zip_file_name = log_name + '.zip' #????????????? zip_dir = dao_config.log_dir_master + 'tmp/'+ zip_file_name archive = zipfile.ZipFile(zip_dir, 'w', zipfile.ZIP_DEFLATED) #??zip????????? archive.write(log_path) #???? archive.close() print(zip_dir) if os.path.isfile(zip_dir): response = StreamingHttpResponse(readFile(zip_dir)) response['Content-Type'] = 'application/octet-stream' response['Content-Disposition'] = 'attachment;filename="{0}"'.format(zip_file_name) return response else: return HttpResponse('??????')
def checkcode(request): # ????????? # file_path = "D:\python\mybbs02\statics\checkcodes" # file_name = 'checkcode' # img_str, img_data = CheckCode.gene_code(file_path, file_name) # ??gene_code???????????????????????????? img_str, img_data = CheckCode.gene_code() # ????????????session????login?????? request.session["checkcode"] = img_str # ??????????????????????<img src="/your_url/">????????? # img_data = open(file_name+".png", 'rb').read() return HttpResponse(img_data)
def login(request): if request.method == 'POST': # ??????????????? username = request.POST.get('username') password = request.POST.get('password') # ?form???????????? check_code = request.POST.get('checkcode') # ???? checkcod??????? session??????? session_code = request.session["checkcode"] # ????????? ???session??? ???? if check_code.strip().lower() != session_code.lower(): return HttpResponse('??????') else: return HttpResponse('?????') return render(request, 'login.html', {'error': "", 'username': '', 'password': ''})
def Subscribe(request, token): ''' ??ssr???? ''' username = token.split('&&')[0] user = base64.b64decode(username) try: user = User.objects.get(username=user) ss_user = user.ss_user except: return HttpResponse('ERROR') # ??token keys = base64.b64encode(bytes(user.username, 'utf-8')).decode('ascii') + \ '&&' + base64.b64encode(bytes(user.password, 'utf-8')).decode('ascii') if token == keys: # ???????? sub_code = '' # ?????????? node_list = Node.objects.filter(level__lte=user.level, show='??') for node in node_list: sub_code = sub_code + node.get_ssr_link(ss_user) + "\n" sub_code = base64.b64encode(bytes(sub_code, 'utf8')).decode('ascii') return HttpResponse(sub_code) else: return HttpResponse('ERROR')
def nodeData(request): ''' ?????? ????? ??????? ''' nodeName = [node.name for node in Node.objects.all()] nodeTraffic = [TrafficLog.totalTraffic( node.node_id) for node in Node.objects.all()] data = { 'nodeName': nodeName, 'nodeTraffic': nodeTraffic, } result = json.dumps(data, ensure_ascii=False) return HttpResponse(result, content_type='application/json')
def change_ss_port(request): ''' ????????? ?????? ''' user = request.user.ss_user # ??????????? port = SSUser.randomPord() user.port = port user.save() registerinfo = { 'title': '?????', 'subtitle': '??????{}?'.format(port), 'status': 'success', } result = json.dumps(registerinfo, ensure_ascii=False) # AJAX ??json?? return HttpResponse(result, content_type='application/json')
def get(self, request, *args, **kwargs): if request.GET.get('format', None) == 'json': self.setup_queryset(*args, **kwargs) # Put the project id into the context for the static_data_template if 'pid' in kwargs: self.static_context_extra['pid'] = kwargs['pid'] cmd = request.GET.get('cmd', None) if cmd and 'filterinfo' in cmd: data = self.get_filter_info(request, **kwargs) else: # If no cmd is specified we give you the table data data = self.get_data(request, **kwargs) return HttpResponse(data, content_type="application/json") return super(ToasterTable, self).get(request, *args, **kwargs)
def edit_comment(request, id=None): if request.method == 'POST': comment = get_object_or_404(Comment, id=id) content = request.POST.get('content') if comment.user != request.user: response = HttpResponse("You do not have permission to view this.") response.status_code = 403 return response parent_url = comment.content_object.get_absolute_url() comment.content = content comment.save() return HttpResponseRedirect(parent_url) return HttpResponseRedirect(request.META.get('HTTP_REFERER'))
def delete_comment(request, id=None): # if request.method == 'POST': # comment = get_object_or_404(Comment, id=id) try: comment = Comment.objects.get(id=id) except: raise Http404 if comment.user != request.user: response = HttpResponse("You do not have permission to view this.") response.status_code = 403 return response parent_url = comment.content_object.get_absolute_url() comment.delete() return HttpResponseRedirect(parent_url)
def update_log(request): if request.method == 'GET': all_log = request.GET.get('all_log') hostname = request.GET.get('hostname') container_name = request.GET.get('container_name') if hostname and container_name: docker_download_log_path = docker_initial().docker_update_log(hostname=hostname,container_name=container_name) return render(request, 'return_index.html', docker_download_log_path) elif all_log: docker_log_bak = docker_initial().docker_update_log(all_log=all_log) if docker_log_bak: return render(request, 'return_index.html',docker_log_bak) else: errors = {'return_results':'???????????!','log_name':None} return render(request, 'return_index.html', errors) return HttpResponse('???~')
def index(request): context = {} context['movie_list'] = [] context['types'] = TYPES if request.method == 'GET': movie_type = request.GET.get('type') q = request.GET.get('q') if q: # ????????????????? context['movie_list'] = conn['spider']['movie'].find({'title': {'$regex': q}}) context['current_cat'] = '????"{}"'.format(q) elif movie_type: # ?????????? if movie_type.encode('utf-8') in context['types']: context['movie_list'] = conn['spider']['movie'].find({'types': movie_type}) context['current_cat'] = movie_type else: return HttpResponse('??????') else: # ?????????????? context['movie_list'] = conn['spider']['movie'].find() context['current_cat'] = '????' return render(request, 'watchfilm/index.html', context)
def log_in(request): if request.method == 'POST': username = request.POST.get('username').strip() password = request.POST.get('password').strip() user = authenticate(username=username, password=password) if not user: User = get_user_model() query = User.objects.filter(email__iexact=username) if query: username = query[0].username user = authenticate(username=username, password=password) if user: if user.is_active: login(request, user) return redirect('/watchfilm/') else: return HttpResponse('not active') else: return HttpResponse('not valid') else: if request.method == 'GET': return render(request, 'watchfilm/login.html')
def asset_cpu(request): if request.method == "POST": ip = request.META['REMOTE_ADDR'] cpu_info = request.POST try: asset = models.Asset.objects.get(ip=ip) except: return HttpResponse("no asset") try: models.CPU.objects.get(asset = asset) models.CPU.objects.filter(asset = asset).update(cpu_model=cpu_info.get('cpu_model'), cpu_count=cpu_info.get('cpu_count'), cpu_core_count=cpu_info.get('cpu_core_count'),) return HttpResponse("cpu asset already update") except ObjectDoesNotExist: models.CPU.objects.create(asset=asset,cpu_model=cpu_info.get('cpu_model'), cpu_count=cpu_info.get('cpu_count'), cpu_core_count=cpu_info.get('cpu_core_count')) else: pass return HttpResponse("ok")
def check_connect(request): if request.method == 'POST': data = json.loads(request.body) ip = data.get("ip") port = int(data.get("port")) uname = data.get("uname") password = data.get("password") ssh = paramiko.SSHClient() ssh.load_system_host_keys() try: ssh.connect(ip, port, uname, password) except paramiko.AuthenticationException: ssh = None if ssh is not None: r = redis.Redis(host='localhost', port=6379, db=0) if not r.get(ip): r.set(ip, [port, uname, password]) return HttpResponse(json.dumps({"result": "success"}), content_type="application/json") else: return HttpResponse(json.dumps({"result": "failed"}), content_type="application/json") else: return Http404
def get(self, request, orgid): ''' API Provide download file template :param request: :param orgid: :return: ''' return render(request,'upload/upload.html') msg = { "status": 1, "data": '', "msg": ''} if self.verification(request) or 1: msg['status'] = 0 msg['data'] = '/static/filetamplate/asset_template.xlsx' # msg['msg'] = 'Verification does not pass' return HttpResponse(json.dumps(msg))
def post(self, request, orgid): msg = {"status": 0, "data": '', "msg": None} file_path = self.wirte_excel(request) if not file_path: msg['status'] = 1 msg['msg'] = "?????????????" return HttpResponse(json.dumps(msg)) data = self.read_excel(file_path) if not data: msg['status'] = 1 msg['msg'] = "The file has no content" return HttpResponse(json.dumps(msg)) data['orgid'] = orgid msg['data'] = data return HttpResponse(json.dumps(data))
def get(self, request, orgid): msg = {"status": 1, "data": '', "msg": None} condition = request.GET.get('condition') data_sum_group_items = Asset.objects.filter(orgid=orgid).values('%s' % condition).annotate( c_sum=Count('%s' % condition)) data = [] for item in data_sum_group_items: tmp = {} tmp['name'] = item['%s' % condition] tmp['sum_nub'] = item['c_sum'] data.append(tmp) if not data: msg['status'] = 0 msg['msg'] = 'No data has been found' msg['data'] = data return HttpResponse(json.dumps(data))
def dispatch(self, request, *args, **kwargs): self.request = request self.response_data = {"msg": "ok", "status": "-1", "data": {}} if request.method in [item.upper() for item in self.ALLOWED_METHOD]: method = getattr(self, request.method.lower()) if request.method.lower() in ["post",]: self.request.data = json.loads(u"".join([item.decode("utf8")for item in self.request.readlines()])) respose = method(request, *args, **kwargs) if isinstance(respose, (list, dict)): self.response_data["data"] = respose self.response_data["status"] = "1" else: self.response_data["msg"] = respose return HttpResponse(json.dumps(self.response_data, ensure_ascii=False), status=200, content_type=" application/json")
def get_disk(request, hostname, timing, partition): data_time = [] disk_percent = [] disk_name = "" range_time = TIME_SECTOR[int(timing)] disk = GetSysData(hostname, "disk", range_time) for doc in disk.get_data(): unix_time = doc['timestamp'] times = time.localtime(unix_time) dt = time.strftime("%m%d-%H:%M", times) data_time.append(dt) d_percent = doc['disk'][int(partition)]['percent'] disk_percent.append(d_percent) if not disk_name: disk_name = doc['disk'][int(partition)]['mountpoint'] data = {"data_time": data_time, "disk_name": disk_name, "disk_percent": disk_percent} return HttpResponse(json.dumps(data))
def del_monitor_data(request, timing): timing = int(timing) if timing == 4: db = GetSysData.connect_db() db.drop_database("sys_info") else: host_list = Host.objects.all() client = GetSysData.connect_db() db = client.sys_info for host in host_list: try: collection = db[host] except: continue now_time = int(time.time()) del_time = now_time-TIME_SECTOR[int(timing)] collection.remove({'timestamp': {'$lte': del_time}}, {"timestamp": 1}) return HttpResponse("ok")
def get_dir(args): config = ConfigParser.RawConfigParser() dirs = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) with open(dirs+'/adminset.conf', 'r') as cfgfile: config.readfp(cfgfile) a_path = config.get('config', 'ansible_path') r_path = config.get('config', 'roles_path') p_path = config.get('config', 'playbook_path') s_path = config.get('config', 'scripts_path') token = config.get('token', 'token') ssh_pwd = config.get('token', 'ssh_pwd') log_path = config.get('log', 'log_path') log_level = config.get('log', 'log_level') mongodb_ip = config.get('mongodb', 'mongodb_ip') mongodb_port = config.get('mongodb', 'mongodb_port') mongodb_user = config.get('mongodb', 'mongodb_user') mongodb_pwd = config.get('mongodb', 'mongodb_pwd') mongodb_collection = config.get('mongodb', 'collection') webssh_domain = config.get('webssh', 'domain') # ??????????????????????????? if args: return vars()[args] else: return HttpResponse(status=403)
def insexport(request): LOGGER.debug('Proceedings export page accessed', request) uid = request.user.id uname = request.user.username res = Insolvency.objects.filter(uid=uid).order_by('desc', 'pk').distinct() response = HttpResponse(content_type='text/csv; charset=utf-8') response['Content-Disposition'] = 'attachment; filename=sir.csv' writer = csvwriter(response) for idx in res: dat = ( idx.desc, str(idx.number), str(idx.year), 'ano' if idx.detailed else 'ne', ) writer.writerow(dat) LOGGER.info('User "{}" ({:d}) exported proceedings'.format(uname, uid), request) return response
def hostgroup_append(request): group_name = request.POST.get('groupname', None) group_desc = request.POST.get('groupdesc', None) group_id = request.POST.get('groupid', None) if group_id and group_name: try: cmdb_models.HostGroup.objects.filter(id=group_id).update( host_group_name=group_name, host_group_jd=group_desc) result_dict = {'flag': 1, 'msg': 'GroupName: %s update successful' % group_name} except Exception: result_dict = {'flag': 0, 'msg': 'GroupName: %s already exist' % group_name} elif group_name: result = cmdb_models.HostGroup.objects.filter(host_group_name=group_name) if result: result_dict = {'flag': 0, 'msg': 'GroupName: %s already exist' % group_name} else: cmdb_models.HostGroup.objects.create( host_group_name=group_name, host_group_jd=group_desc ) result_dict = {'flag': 1, 'msg': 'GroupName: %s append successful' % group_name} else: result_dict = {'flag': 0, 'msg': 'GroupName: is None'} return HttpResponse(json.dumps(result_dict))
def group_del(request): hid_list = request.POST.get('host_list', None) gid_list = request.POST.get('group_list', None) group_id_list = [] host_id_list = [] if len(hid_list): for item in hid_list.split(','): host_id_list.append(int(item)) if len(gid_list): for item in gid_list.split(','): group_id_list.append(int(item)) data = {'msg': '', 'flag': 1} try: if len(host_id_list): r = cmdb_models.HostInfo.objects.filter(id__in=host_id_list).delete() data['msg'] = r if len(group_id_list): r = cmdb_models.HostGroup.objects.filter(id__in=group_id_list).delete() data['msg'] = r except Exception as e: data['msg'] = e data['flag'] = 0 return HttpResponse(json.dumps(data))
def post(self, request, pk): with transaction.atomic(): user = User.objects.select_for_update().get(pk=pk) if self.request.user.privilege == Privilege.ROOT: if user.privilege == 'user': user.privilege = 'volunteer' elif user.privilege == 'volunteer': user.privilege = 'admin' elif user.privilege == 'admin': user.privilege = 'user' elif self.request.user.privilege == Privilege.ADMIN: if user.privilege == 'user': user.privilege = 'volunteer' elif user.privilege == 'volunteer': user.privilege = 'user' user.save() return HttpResponse(json.dumps({'result': 'success'}))
def get(self, request, *args, **kwargs): if 'data' in request.GET: problems = self.contest.contestproblem_set.select_related('problem').all() data = [] SUB_FIELDS = ["title", "id", "alias"] STATISTIC_FIELDS = [ ('ac1', get_problem_accept_count), ('ac2', get_problem_accept_user_count), ('tot1', get_problem_all_count), ('tot2', get_problem_all_user_count), ('ratio1', get_problem_accept_ratio), ('ratio2', get_problem_accept_user_ratio), ] for problem in problems: d = {k: getattr(problem.problem, k) for k in SUB_FIELDS} d.update(pid=problem.id, identifier=problem.identifier, weight=problem.weight) d.update({k: v(problem.problem_id, self.contest.id) for k, v in STATISTIC_FIELDS}) data.append(d) data.sort(key=lambda x: x['identifier']) return HttpResponse(json.dumps(data)) return super(ContestProblemManage, self).get(request, *args, **kwargs)
def search_images(request): term = request.POST['term'] images = requests.get('http://localhost:' + DOCKER_API_PORT + '/images/search?term=' + term) return HttpResponse(json.dumps(images.json()), content_type='application/json')
def test_firebase(request): firebase_db = FirebaseApplication('https://vkusotiiki-bg.firebaseio.com/', None) # no authentication users = firebase_db.get('/User', None) print(users) return HttpResponse('Heeey', {})
def testcheck(request): '''test url ''' # auto_register(10) # do some test page # check_user_state() # clean_traffic_log() return HttpResponse('ok')
def test(request): '''??api''' data = { 'user': [1, 2, 3, 4] } result = json.dumps(data, ensure_ascii=False) return HttpResponse(result, content_type='application/json')
def userData(request): ''' ??????? ??????????????????? ''' data = [NodeOnlineLog.totalOnlineUser(), len(User.todayRegister()), SSUser.userTodyChecked(), SSUser.userNeverChecked(), SSUser.userNeverUsed(), ] result = json.dumps(data, ensure_ascii=False) return HttpResponse(result, content_type='application/json')
def gen_invite_code(request): ''' ???????? ?????? ''' u = request.user if u.pk == 1: # ??????????????5???? num = 5 else: num = u.invitecode_num - len(InviteCode.objects.filter(code_id=u.pk)) if num > 0: for i in range(num): code = InviteCode(type=0, code_id=u.pk) code.save() registerinfo = { 'title': '??', 'subtitle': '?????{}?,?????'.format(num), 'status': 'success', } else: registerinfo = { 'title': '??', 'subtitle': '?????????????', 'status': 'error', } result = json.dumps(registerinfo, ensure_ascii=False) return HttpResponse(result, content_type='application/json')
def showapi(request): ## ?? name_id = models.JiguiInfo.objects.filter(id__gt=0) name = [] jq = [] for i in name_id: name.append(i.name) jq.append(i.jq) ret={'name':name,'jq':jq} return HttpResponse(json.dumps(ret))
def delete_jigui(request):##???? ret = {'status': True, 'error': None, 'data': None} if request.method == "POST": ids = request.POST.getlist('id') idstring = ','.join(ids) models.JiguiInfo.objects.extra(where=['id IN (' + idstring + ')']).delete() return HttpResponse(json.dumps(ret))
def shdel(request): # ?? ret = {'status': True, 'error': None, 'data': None} if request.method == "POST": id = request.POST.get('id',None) obj1 = ToolsScript.objects.filter(id=id).delete() return HttpResponse(json.dumps(ret))
def shdelall(request):##???? ret = {'status': True, 'error': None, 'data': None} if request.method == "POST": ids = request.POST.getlist('id') idstring = ','.join(ids) ToolsScript.objects.extra(where=['id IN (' + idstring + ')']).delete() return HttpResponse(json.dumps(ret))
def host_change_password(request): ##???? if request.method == 'POST': id = request.POST.get('id', None) obj = Host.objects.filter(id=id).first() password = obj.password return HttpResponse(password)
def host_del(request): ##?? ret = {'status': True, 'error': None, 'data': None} if request.method == 'POST': try: id = request.POST.get('id', None) obj = Host.objects.filter(id=id).delete() except Exception as e: ret['status'] = False ret['error'] = '??????' return HttpResponse(json.dumps(ret))
def cmd(request): ##??? if request.method == "GET": obj = Host.objects.filter(id__gt=0) return render(request, 'host/cmd.html', {"host_list": obj, }) if request.method == 'POST': ids = request.POST.getlist('id') cmd = request.POST.get('cmd', None) idstring = ','.join(ids) if not ids: error1 = "?????" ret = {"error": error1, "status": False} return HttpResponse(json.dumps(ret)) elif not cmd: error1 = "?????" ret = {"error": error1, "status": False} return HttpResponse(json.dumps(ret)) obj1 = Host.objects.extra(where=['id IN (' + idstring + ')']) x = {} x['status'] = True x['data'] = [] for i in obj1: a = ssh(ip=i.ip,port=i.port,username=i.username,password=i.password,cmd=cmd) history = History.objects.create(ip=i.ip, root=i.username, port=i.port, cmd=cmd, user=i.username) x['data'].append(a) return HttpResponse(json.dumps(x))
def hostall_del(request): ##???? ret = {'status': True, 'error': None, 'data': None} if request.method == "POST": ids = request.POST.getlist('id') idstring = ','.join(ids) Host.objects.extra(where=['id IN (' + idstring + ')']).delete() return HttpResponse(json.dumps(ret))
def host_network_api(request): # ????api if request.method == 'GET': id = request.GET.get('id', None) all = Monitor.objects.all() date = [] in_use = [] out_use = [] for i in all: if i.server_id == int(id): date.append(i.create_date.strftime("%m-%d %H:%M")) in_use.append(i.in_use) out_use.append(i.out_use) ret = {'date': date, 'in_use': in_use, 'out_use': out_use} return HttpResponse(json.dumps(ret))
def host_web_ssh(request): ## web ssh ?? if request.method == 'POST': id = request.POST.get('id', None) obj = Host.objects.filter(id=id).first() ip = obj.ip+":"+obj.port username = obj.username password = obj.password ret = {"ip":ip,"username":username,'password':password,"static":True} return HttpResponse(json.dumps(ret))
def get_cities(request): ''' Returns all the weather stations in database ''' weather_stations = WeatherStation.objects.all() serialized_data = serialize_weather_stations(weather_stations) return HttpResponse(json.dumps(serialized_data), content_type="application/json")
def add_weather_station(request): ''' Add a new weather station in database ''' if request.method == 'POST': response = json.loads(request.body) zmw = response["zmw"] name = response["name"] country = response["country"] weather_station = WeatherStation.objects.get_or_create( name=name, country=country, zmw=zmw) return HttpResponse(json.dumps({"message": "added"}), content_type="application/json")
def get(self, request, *args, **kwargs): def response(data): return HttpResponse(json.dumps(data, indent=2, cls=DjangoJSONEncoder), content_type="application/json") error = "ok" search_term = request.GET.get("search", None) if search_term is None: # We got no search value so return empty reponse return response({'error': error, 'results': []}) try: prj = Project.objects.get(pk=kwargs['pid']) except KeyError: prj = None results = self.apply_search(search_term, prj, request)[:ToasterTypeAhead.MAX_RESULTS] if len(results) > 0: try: self.validate_fields(results[0]) except self.MissingFieldsException as e: error = e data = {'results': results, 'error': error} return response(data)
def index(request): # request ?????????self????????????????? # request.POSt # request.GET # return HttpResponse("Hello Yuan Xin ! ") # ??????????????????????django??????python???? if request.method == "POST": username = request.POST.get("username", None) password = request.POST.get("password", None) temp = {"user":username,"pwd":password} user_list.append(temp) return render(request, "index.html", {"data":user_list})
def clustering_pubs_status_page(request, index_name): dci = DocClusteringInfo.objects.get(index_name=index_name) if request.GET.get('type', 'HTML') == 'JSON': result = json.dumps({'status': dci.status, 'k': dci.k, 'iter': dci.iter, 'cost': dci.cost}, ensure_ascii=False) return HttpResponse(result, content_type='application/json; charset=utf-8') return render(request, 'clustering_pubs_status.html')
def user_logout(self, request, redirect=None): if not redirect: # Try getting it from the query string redirect = self.request.GET.get("redirect", None) if redirect: return HttpResponse(redirect) else: return HttpResponse(getsettings('url_prefix','/'))
def get(self,request): check = self.request.GET.get('check',False) if check in ['true','false',False]:#solve ast malformed string exception check = {'true':True,'false':False}[str(check).lower()] else: check = ast.literal_eval(check) if self.request.user == 'AnonymousUser': user = {'upn': 'ANONYMOUS'} else: user = {'upn': str(self.request.user)} if check and self.request.user.is_authenticated(): response = HttpResponse(u'authenticated') response["Access-Control-Allow-Origin"] = "*" response["Server"] = "GateOne" return response logout_get = self.request.GET.get("logout", None) if logout_get: logout(request) response = HttpResponse('/') response.delete_cookie('gateone_user') self.user_logout(request) return response next_url = self.request.GET.get("next", None) if next_url: return redirect(next_url) return redirect(getsettings('url_prefix','/'))